source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/crawler.py
file stats: 221 lines, 215 executed: 97.3% covered
coverage versus previous test: 0 lines added, 0 lines removed
    1. 
    2. import os, time, struct
    3. import cPickle as pickle
    4. from twisted.internet import reactor
    5. from twisted.application import service
    6. from allmydata.storage.common import si_b2a
    7. from allmydata.util import fileutil
    8. 
    9. class TimeSliceExceeded(Exception):
   10.     pass
   11. 
   12. class ShareCrawler(service.MultiService):
   13.     """A ShareCrawler subclass is attached to a StorageServer, and
   14.     periodically walks all of its shares, processing each one in some
   15.     fashion. This crawl is rate-limited, to reduce the IO burden on the host,
   16.     since large servers can easily have a terabyte of shares, in several
   17.     million files, which can take hours or days to read.
   18. 
   19.     Once the crawler starts a cycle, it will proceed at a rate limited by the
   20.     allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
   21.     after it has worked for 'cpu_slice' seconds, and not resuming right away,
   22.     always trying to use less than 'allowed_cpu_percentage'.
   23. 
   24.     Once the crawler finishes a cycle, it will put off starting the next one
   25.     long enough to ensure that 'minimum_cycle_time' elapses between the start
   26.     of two consecutive cycles.
   27. 
   28.     We assume that the normal upload/download/get_buckets traffic of a tahoe
   29.     grid will cause the prefixdir contents to be mostly cached in the kernel,
   30.     or that the number of buckets in each prefixdir will be small enough to
   31.     load quickly. A 1TB allmydata.com server was measured to have 2.56M
   32.     buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
   33.     prefix. On this server, each prefixdir took 130ms-200ms to list the first
   34.     time, and 17ms to list the second time.
   35. 
   36.     To use a crawler, create a subclass which implements the process_bucket()
   37.     method. It will be called with a prefixdir and a base32 storage index
   38.     string. process_bucket() must run synchronously. Any keys added to
   39.     self.state will be preserved. Override add_initial_state() to set up
   40.     initial state keys. Override finished_cycle() to perform additional
   41.     processing when the cycle is complete. Any status that the crawler
   42.     produces should be put in the self.state dictionary. Status renderers
   43.     (like a web page which describes the accomplishments of your crawler)
   44.     will use crawler.get_state() to retrieve this dictionary; they can
   45.     present the contents as they see fit.
   46. 
   47.     Then create an instance, with a reference to a StorageServer and a
   48.     filename where it can store persistent state. The statefile is used to
   49.     keep track of how far around the ring the process has travelled, as well
   50.     as timing history to allow the pace to be predicted and controlled. The
   51.     statefile will be updated and written to disk after each time slice (just
   52.     before the crawler yields to the reactor), and also after each cycle is
   53.     finished, and also when stopService() is called. Note that this means
   54.     that a crawler which is interrupted with SIGKILL while it is in the
   55.     middle of a time slice will lose progress: the next time the node is
   56.     started, the crawler will repeat some unknown amount of work.
   57. 
   58.     The crawler instance must be started with startService() before it will
   59.     do any work. To make it stop doing work, call stopService().
   60.     """
   61. 
   62.     slow_start = 300 # don't start crawling for 5 minutes after startup
   63.     # all three of these can be changed at any time
   64.     allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
   65.     cpu_slice = 1.0 # use up to 1.0 seconds before yielding
   66.     minimum_cycle_time = 300 # don't run a cycle faster than this
   67. 
   68.     def __init__(self, server, statefile, allowed_cpu_percentage=None):
   69.         service.MultiService.__init__(self)
   70.         if allowed_cpu_percentage is not None:
   71.             self.allowed_cpu_percentage = allowed_cpu_percentage
   72.         self.server = server
   73.         self.sharedir = server.sharedir
   74.         self.statefile = statefile
   75.         self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
   76.                          for i in range(2**10)]
   77.         self.prefixes.sort()
   78.         self.timer = None
   79.         self.bucket_cache = (None, [])
   80.         self.current_sleep_time = None
   81.         self.next_wake_time = None
   82.         self.last_prefix_finished_time = None
   83.         self.last_prefix_elapsed_time = None
   84.         self.last_cycle_started_time = None
   85.         self.last_cycle_elapsed_time = None
   86.         self.load_state()
   87. 
   88.     def minus_or_none(self, a, b):
   89.         if a is None:
   90.             return None
   91.         return a-b
   92. 
   93.     def get_progress(self):
   94.         """I return information about how much progress the crawler is
   95.         making. My return value is a dictionary. The primary key is
   96.         'cycle-in-progress': True if the crawler is currently traversing the
   97.         shares, False if it is idle between cycles.
   98. 
   99.         Note that any of these 'time' keys could be None if I am called at
  100.         certain moments, so application code must be prepared to tolerate
  101.         this case. The estimates will also be None if insufficient data has
  102.         been gatherered to form an estimate.
  103. 
  104.         If cycle-in-progress is True, the following keys will be present::
  105. 
  106.          cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
  107.                                      far the crawler has progressed through
  108.                                      the current cycle
  109.          remaining-sleep-time: float, seconds from now when we do more work
  110.          estimated-cycle-complete-time-left:
  111.                 float, seconds remaining until the current cycle is finished.
  112.                 TODO: this does not yet include the remaining time left in
  113.                 the current prefixdir, and it will be very inaccurate on fast
  114.                 crawlers (which can process a whole prefix in a single tick)
  115.          estimated-time-per-cycle: float, seconds required to do a complete
  116.                                    cycle
  117. 
  118.         If cycle-in-progress is False, the following keys are available::
  119. 
  120.          next-crawl-time: float, seconds-since-epoch when next crawl starts
  121.          remaining-wait-time: float, seconds from now when next crawl starts
  122.          estimated-time-per-cycle: float, seconds required to do a complete
  123.                                    cycle
  124.         """
  125. 
  126.         d = {}
  127. 
  128.         if self.state["current-cycle"] is None:
  129.             d["cycle-in-progress"] = False
  130.             d["next-crawl-time"] = self.next_wake_time
  131.             d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
  132.                                                           time.time())
  133.         else:
  134.             d["cycle-in-progress"] = True
  135.             pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
  136.             d["cycle-complete-percentage"] = pct
  137.             remaining = None
  138.             if self.last_prefix_elapsed_time is not None:
  139.                 left = len(self.prefixes) - self.last_complete_prefix_index
  140.                 remaining = left * self.last_prefix_elapsed_time
  141.                 # TODO: remainder of this prefix: we need to estimate the
  142.                 # per-bucket time, probably by measuring the time spent on
  143.                 # this prefix so far, divided by the number of buckets we've
  144.                 # processed.
  145.             d["estimated-cycle-complete-time-left"] = remaining
  146.             # it's possible to call get_progress() from inside a crawler's
  147.             # finished_prefix() function
  148.             d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
  149.                                                            time.time())
  150.         per_cycle = None
  151.         if self.last_cycle_elapsed_time is not None:
  152.             per_cycle = self.last_cycle_elapsed_time
  153.         elif self.last_prefix_elapsed_time is not None:
  154.             per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
  155.         d["estimated-time-per-cycle"] = per_cycle
  156.         return d
  157. 
  158.     def get_state(self):
  159.         """I return the current state of the crawler. This is a copy of my
  160.         state dictionary.
  161. 
  162.         If we are not currently sleeping (i.e. get_state() was called from
  163.         inside the process_prefixdir, process_bucket, or finished_cycle()
  164.         methods, or if startService has not yet been called on this crawler),
  165.         these two keys will be None.
  166. 
  167.         Subclasses can override this to add computed keys to the return value,
  168.         but don't forget to start with the upcall.
  169.         """
  170.         state = self.state.copy() # it isn't a deepcopy, so don't go crazy
  171.         return state
  172. 
  173.     def load_state(self):
  174.         # we use this to store state for both the crawler's internals and
  175.         # anything the subclass-specific code needs. The state is stored
  176.         # after each bucket is processed, after each prefixdir is processed,
  177.         # and after a cycle is complete. The internal keys we use are:
  178.         #  ["version"]: int, always 1
  179.         #  ["last-cycle-finished"]: int, or None if we have not yet finished
  180.         #                           any cycle
  181.         #  ["current-cycle"]: int, or None if we are sleeping between cycles
  182.         #  ["current-cycle-start-time"]: int, seconds-since-epoch of when this
  183.         #                                cycle was started, possibly by an earlier
  184.         #                                process
  185.         #  ["last-complete-prefix"]: str, two-letter name of the last prefixdir
  186.         #                            that was fully processed, or None if we
  187.         #                            are sleeping between cycles, or if we
  188.         #                            have not yet finished any prefixdir since
  189.         #                            a cycle was started
  190.         #  ["last-complete-bucket"]: str, base32 storage index bucket name
  191.         #                            of the last bucket to be processed, or
  192.         #                            None if we are sleeping between cycles
  193.         try:
  194.             f = open(self.statefile, "rb")
  195.             state = pickle.load(f)
  196.             f.close()
  197.         except EnvironmentError:
  198.             state = {"version": 1,
  199.                      "last-cycle-finished": None,
  200.                      "current-cycle": None,
  201.                      "last-complete-prefix": None,
  202.                      "last-complete-bucket": None,
  203.                      }
  204.         state.setdefault("current-cycle-start-time", time.time()) # approximate
  205.         self.state = state
  206.         lcp = state["last-complete-prefix"]
  207.         if lcp == None:
  208.             self.last_complete_prefix_index = -1
  209.         else:
  210.             self.last_complete_prefix_index = self.prefixes.index(lcp)
  211.         self.add_initial_state()
  212. 
  213.     def add_initial_state(self):
  214.         """Hook method to add extra keys to self.state when first loaded.
  215. 
  216.         The first time this Crawler is used, or when the code has been
  217.         upgraded, the saved state file may not contain all the keys you
  218.         expect. Use this method to add any missing keys. Simply modify
  219.         self.state as needed.
  220. 
  221.         This method for subclasses to override. No upcall is necessary.
  222.         """
  223.         pass
  224. 
  225.     def save_state(self):
  226.         lcpi = self.last_complete_prefix_index
  227.         if lcpi == -1:
  228.             last_complete_prefix = None
  229.         else:
  230.             last_complete_prefix = self.prefixes[lcpi]
  231.         self.state["last-complete-prefix"] = last_complete_prefix
  232.         tmpfile = self.statefile + ".tmp"
  233.         f = open(tmpfile, "wb")
  234.         pickle.dump(self.state, f)
  235.         f.close()
  236.         fileutil.move_into_place(tmpfile, self.statefile)
  237. 
  238.     def startService(self):
  239.         # arrange things to look like we were just sleeping, so
  240.         # status/progress values work correctly
  241.         self.sleeping_between_cycles = True
  242.         self.current_sleep_time = self.slow_start
  243.         self.next_wake_time = time.time() + self.slow_start
  244.         self.timer = reactor.callLater(self.slow_start, self.start_slice)
  245.         service.MultiService.startService(self)
  246. 
  247.     def stopService(self):
  248.         if self.timer:
  249.             self.timer.cancel()
  250.             self.timer = None
  251.         self.save_state()
  252.         return service.MultiService.stopService(self)
  253. 
  254.     def start_slice(self):
  255.         start_slice = time.time()
  256.         self.timer = None
  257.         self.sleeping_between_cycles = False
  258.         self.current_sleep_time = None
  259.         self.next_wake_time = None
  260.         try:
  261.             s = self.last_complete_prefix_index
  262.             self.start_current_prefix(start_slice)
  263.             finished_cycle = True
  264.         except TimeSliceExceeded:
  265.             finished_cycle = False
  266.         self.save_state()
  267.         if not self.running:
  268.             # someone might have used stopService() to shut us down
  269.             return
  270.         # either we finished a whole cycle, or we ran out of time
  271.         now = time.time()
  272.         this_slice = now - start_slice
  273.         # this_slice/(this_slice+sleep_time) = percentage
  274.         # this_slice/percentage = this_slice+sleep_time
  275.         # sleep_time = (this_slice/percentage) - this_slice
  276.         sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
  277.         # if the math gets weird, or a timequake happens, don't sleep
  278.         # forever. Note that this means that, while a cycle is running, we
  279.         # will process at least one bucket every 5 minutes, no matter how
  280.         # long that bucket takes.
  281.         sleep_time = max(0.0, min(sleep_time, 299))
  282.         if finished_cycle:
  283.             # how long should we sleep between cycles? Don't run faster than
  284.             # allowed_cpu_percentage says, but also run faster than
  285.             # minimum_cycle_time
  286.             self.sleeping_between_cycles = True
  287.             sleep_time = max(sleep_time, self.minimum_cycle_time)
  288.         else:
  289.             self.sleeping_between_cycles = False
  290.         self.current_sleep_time = sleep_time # for status page
  291.         self.next_wake_time = now + sleep_time
  292.         self.yielding(sleep_time)
  293.         self.timer = reactor.callLater(sleep_time, self.start_slice)
  294. 
  295.     def start_current_prefix(self, start_slice):
  296.         state = self.state
  297.         if state["current-cycle"] is None:
  298.             self.last_cycle_started_time = time.time()
  299.             state["current-cycle-start-time"] = self.last_cycle_started_time
  300.             if state["last-cycle-finished"] is None:
  301.                 state["current-cycle"] = 0
  302.             else:
  303.                 state["current-cycle"] = state["last-cycle-finished"] + 1
  304.             self.started_cycle(state["current-cycle"])
  305.         cycle = state["current-cycle"]
  306. 
  307.         for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
  308.             # if we want to yield earlier, just raise TimeSliceExceeded()
  309.             prefix = self.prefixes[i]
  310.             prefixdir = os.path.join(self.sharedir, prefix)
  311.             if i == self.bucket_cache[0]:
  312.                 buckets = self.bucket_cache[1]
  313.             else:
  314.                 try:
  315.                     buckets = os.listdir(prefixdir)
  316.                     buckets.sort()
  317.                 except EnvironmentError:
  318.                     buckets = []
  319.                 self.bucket_cache = (i, buckets)
  320.             self.process_prefixdir(cycle, prefix, prefixdir,
  321.                                    buckets, start_slice)
  322.             self.last_complete_prefix_index = i
  323. 
  324.             now = time.time()
  325.             if self.last_prefix_finished_time is not None:
  326.                 elapsed = now - self.last_prefix_finished_time
  327.                 self.last_prefix_elapsed_time = elapsed
  328.             self.last_prefix_finished_time = now
  329. 
  330.             self.finished_prefix(cycle, prefix)
  331.             if time.time() >= start_slice + self.cpu_slice:
  332.                 raise TimeSliceExceeded()
  333. 
  334.         # yay! we finished the whole cycle
  335.         self.last_complete_prefix_index = -1
  336.         self.last_prefix_finished_time = None # don't include the sleep
  337.         now = time.time()
  338.         if self.last_cycle_started_time is not None:
  339.             self.last_cycle_elapsed_time = now - self.last_cycle_started_time
  340.         state["last-complete-bucket"] = None
  341.         state["last-cycle-finished"] = cycle
  342.         state["current-cycle"] = None
  343.         self.finished_cycle(cycle)
  344.         self.save_state()
  345. 
  346.     def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
  347.         """This gets a list of bucket names (i.e. storage index strings,
  348.         base32-encoded) in sorted order.
  349. 
  350.         You can override this if your crawler doesn't care about the actual
  351.         shares, for example a crawler which merely keeps track of how many
  352.         buckets are being managed by this server.
  353. 
  354.         Subclasses which *do* care about actual bucket should leave this
  355.         method along, and implement process_bucket() instead.
  356.         """
  357. 
  358.         for bucket in buckets:
  359.             if bucket <= self.state["last-complete-bucket"]:
  360.                 continue
  361.             self.process_bucket(cycle, prefix, prefixdir, bucket)
  362.             self.state["last-complete-bucket"] = bucket
  363.             if time.time() >= start_slice + self.cpu_slice:
  364.                 raise TimeSliceExceeded()
  365. 
  366.     # the remaining methods are explictly for subclasses to implement.
  367. 
  368.     def started_cycle(self, cycle):
  369.         """Notify a subclass that the crawler is about to start a cycle.
  370. 
  371.         This method is for subclasses to override. No upcall is necessary.
  372.         """
  373.         pass
  374. 
  375.     def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
  376.         """Examine a single bucket. Subclasses should do whatever they want
  377.         to do to the shares therein, then update self.state as necessary.
  378. 
  379.         If the crawler is never interrupted by SIGKILL, this method will be
  380.         called exactly once per share (per cycle). If it *is* interrupted,
  381.         then the next time the node is started, some amount of work will be
  382.         duplicated, according to when self.save_state() was last called. By
  383.         default, save_state() is called at the end of each timeslice, and
  384.         after finished_cycle() returns, and when stopService() is called.
  385. 
  386.         To reduce the chance of duplicate work (i.e. to avoid adding multiple
  387.         records to a database), you can call save_state() at the end of your
  388.         process_bucket() method. This will reduce the maximum duplicated work
  389.         to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
  390.         per bucket (and some disk writes), which will count against your
  391.         allowed_cpu_percentage, and which may be considerable if
  392.         process_bucket() runs quickly.
  393. 
  394.         This method is for subclasses to override. No upcall is necessary.
  395.         """
  396.         pass
  397. 
  398.     def finished_prefix(self, cycle, prefix):
  399.         """Notify a subclass that the crawler has just finished processing a
  400.         prefix directory (all buckets with the same two-character/10bit
  401.         prefix). To impose a limit on how much work might be duplicated by a
  402.         SIGKILL that occurs during a timeslice, you can call
  403.         self.save_state() here, but be aware that it may represent a
  404.         significant performance hit.
  405. 
  406.         This method is for subclasses to override. No upcall is necessary.
  407.         """
  408.         pass
  409. 
  410.     def finished_cycle(self, cycle):
  411.         """Notify subclass that a cycle (one complete traversal of all
  412.         prefixdirs) has just finished. 'cycle' is the number of the cycle
  413.         that just finished. This method should perform summary work and
  414.         update self.state to publish information to status displays.
  415. 
  416.         One-shot crawlers, such as those used to upgrade shares to a new
  417.         format or populate a database for the first time, can call
  418.         self.stopService() (or more likely self.disownServiceParent()) to
  419.         prevent it from running a second time. Don't forget to set some
  420.         persistent state so that the upgrader won't be run again the next
  421.         time the node is started.
  422. 
  423.         This method is for subclasses to override. No upcall is necessary.
  424.         """
  425.         pass
  426. 
  427.     def yielding(self, sleep_time):
  428.         """The crawler is about to sleep for 'sleep_time' seconds. This
  429.         method is mostly for the convenience of unit tests.
  430. 
  431.         This method is for subclasses to override. No upcall is necessary.
  432.         """
  433.         pass
  434. 
  435. 
  436. class BucketCountingCrawler(ShareCrawler):
  437.     """I keep track of how many buckets are being managed by this server.
  438.     This is equivalent to the number of distributed files and directories for
  439.     which I am providing storage. The actual number of files+directories in
  440.     the full grid is probably higher (especially when there are more servers
  441.     than 'N', the number of generated shares), because some files+directories
  442.     will have shares on other servers instead of me. Also note that the
  443.     number of buckets will differ from the number of shares in small grids,
  444.     when more than one share is placed on a single server.
  445.     """
  446. 
  447.     minimum_cycle_time = 60*60 # we don't need this more than once an hour
  448. 
  449.     def __init__(self, server, statefile, num_sample_prefixes=1):
  450.         ShareCrawler.__init__(self, server, statefile)
  451.         self.num_sample_prefixes = num_sample_prefixes
  452. 
  453.     def add_initial_state(self):
  454.         # ["bucket-counts"][cyclenum][prefix] = number
  455.         # ["last-complete-cycle"] = cyclenum # maintained by base class
  456.         # ["last-complete-bucket-count"] = number
  457.         # ["storage-index-samples"][prefix] = (cyclenum,
  458.         #                                      list of SI strings (base32))
  459.         self.state.setdefault("bucket-counts", {})
  460.         self.state.setdefault("last-complete-bucket-count", None)
  461.         self.state.setdefault("storage-index-samples", {})
  462. 
  463.     def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
  464.         # we override process_prefixdir() because we don't want to look at
  465.         # the individual buckets. We'll save state after each one. On my
  466.         # laptop, a mostly-empty storage server can process about 70
  467.         # prefixdirs in a 1.0s slice.
  468.         if cycle not in self.state["bucket-counts"]:
  469.             self.state["bucket-counts"][cycle] = {}
  470.         self.state["bucket-counts"][cycle][prefix] = len(buckets)
  471.         if prefix in self.prefixes[:self.num_sample_prefixes]:
  472.             self.state["storage-index-samples"][prefix] = (cycle, buckets)
  473. 
  474.     def finished_cycle(self, cycle):
  475.         last_counts = self.state["bucket-counts"].get(cycle, [])
  476.         if len(last_counts) == len(self.prefixes):
  477.             # great, we have a whole cycle.
  478.             num_buckets = sum(last_counts.values())
  479.             self.state["last-complete-bucket-count"] = num_buckets
  480.             # get rid of old counts
  481.             for old_cycle in list(self.state["bucket-counts"].keys()):
  482.                 if old_cycle != cycle:
  483.                     del self.state["bucket-counts"][old_cycle]
  484.         # get rid of old samples too
  485.         for prefix in list(self.state["storage-index-samples"].keys()):
  486.             old_cycle,buckets = self.state["storage-index-samples"][prefix]
  487.             if old_cycle != cycle:
  488.                 del self.state["storage-index-samples"][prefix]
  489.