source file: /home/buildslave/tahoe/edgy/build/src/allmydata/storage/crawler.py
file stats: 221 lines, 215 executed: 97.3% covered
coverage versus previous test: 0 lines added, 0 lines removed
1.
2. import os, time, struct
3. import cPickle as pickle
4. from twisted.internet import reactor
5. from twisted.application import service
6. from allmydata.storage.common import si_b2a
7. from allmydata.util import fileutil
8.
9. class TimeSliceExceeded(Exception):
10. pass
11.
12. class ShareCrawler(service.MultiService):
13. """A ShareCrawler subclass is attached to a StorageServer, and
14. periodically walks all of its shares, processing each one in some
15. fashion. This crawl is rate-limited, to reduce the IO burden on the host,
16. since large servers can easily have a terabyte of shares, in several
17. million files, which can take hours or days to read.
18.
19. Once the crawler starts a cycle, it will proceed at a rate limited by the
20. allowed_cpu_percentage= and cpu_slice= parameters: yielding the reactor
21. after it has worked for 'cpu_slice' seconds, and not resuming right away,
22. always trying to use less than 'allowed_cpu_percentage'.
23.
24. Once the crawler finishes a cycle, it will put off starting the next one
25. long enough to ensure that 'minimum_cycle_time' elapses between the start
26. of two consecutive cycles.
27.
28. We assume that the normal upload/download/get_buckets traffic of a tahoe
29. grid will cause the prefixdir contents to be mostly cached in the kernel,
30. or that the number of buckets in each prefixdir will be small enough to
31. load quickly. A 1TB allmydata.com server was measured to have 2.56M
32. buckets, spread into the 1024 prefixdirs, with about 2500 buckets per
33. prefix. On this server, each prefixdir took 130ms-200ms to list the first
34. time, and 17ms to list the second time.
35.
36. To use a crawler, create a subclass which implements the process_bucket()
37. method. It will be called with a prefixdir and a base32 storage index
38. string. process_bucket() must run synchronously. Any keys added to
39. self.state will be preserved. Override add_initial_state() to set up
40. initial state keys. Override finished_cycle() to perform additional
41. processing when the cycle is complete. Any status that the crawler
42. produces should be put in the self.state dictionary. Status renderers
43. (like a web page which describes the accomplishments of your crawler)
44. will use crawler.get_state() to retrieve this dictionary; they can
45. present the contents as they see fit.
46.
47. Then create an instance, with a reference to a StorageServer and a
48. filename where it can store persistent state. The statefile is used to
49. keep track of how far around the ring the process has travelled, as well
50. as timing history to allow the pace to be predicted and controlled. The
51. statefile will be updated and written to disk after each time slice (just
52. before the crawler yields to the reactor), and also after each cycle is
53. finished, and also when stopService() is called. Note that this means
54. that a crawler which is interrupted with SIGKILL while it is in the
55. middle of a time slice will lose progress: the next time the node is
56. started, the crawler will repeat some unknown amount of work.
57.
58. The crawler instance must be started with startService() before it will
59. do any work. To make it stop doing work, call stopService().
60. """
61.
62. slow_start = 300 # don't start crawling for 5 minutes after startup
63. # all three of these can be changed at any time
64. allowed_cpu_percentage = .10 # use up to 10% of the CPU, on average
65. cpu_slice = 1.0 # use up to 1.0 seconds before yielding
66. minimum_cycle_time = 300 # don't run a cycle faster than this
67.
68. def __init__(self, server, statefile, allowed_cpu_percentage=None):
69. service.MultiService.__init__(self)
70. if allowed_cpu_percentage is not None:
71. self.allowed_cpu_percentage = allowed_cpu_percentage
72. self.server = server
73. self.sharedir = server.sharedir
74. self.statefile = statefile
75. self.prefixes = [si_b2a(struct.pack(">H", i << (16-10)))[:2]
76. for i in range(2**10)]
77. self.prefixes.sort()
78. self.timer = None
79. self.bucket_cache = (None, [])
80. self.current_sleep_time = None
81. self.next_wake_time = None
82. self.last_prefix_finished_time = None
83. self.last_prefix_elapsed_time = None
84. self.last_cycle_started_time = None
85. self.last_cycle_elapsed_time = None
86. self.load_state()
87.
88. def minus_or_none(self, a, b):
89. if a is None:
90. return None
91. return a-b
92.
93. def get_progress(self):
94. """I return information about how much progress the crawler is
95. making. My return value is a dictionary. The primary key is
96. 'cycle-in-progress': True if the crawler is currently traversing the
97. shares, False if it is idle between cycles.
98.
99. Note that any of these 'time' keys could be None if I am called at
100. certain moments, so application code must be prepared to tolerate
101. this case. The estimates will also be None if insufficient data has
102. been gatherered to form an estimate.
103.
104. If cycle-in-progress is True, the following keys will be present::
105.
106. cycle-complete-percentage': float, from 0.0 to 100.0, indicating how
107. far the crawler has progressed through
108. the current cycle
109. remaining-sleep-time: float, seconds from now when we do more work
110. estimated-cycle-complete-time-left:
111. float, seconds remaining until the current cycle is finished.
112. TODO: this does not yet include the remaining time left in
113. the current prefixdir, and it will be very inaccurate on fast
114. crawlers (which can process a whole prefix in a single tick)
115. estimated-time-per-cycle: float, seconds required to do a complete
116. cycle
117.
118. If cycle-in-progress is False, the following keys are available::
119.
120. next-crawl-time: float, seconds-since-epoch when next crawl starts
121. remaining-wait-time: float, seconds from now when next crawl starts
122. estimated-time-per-cycle: float, seconds required to do a complete
123. cycle
124. """
125.
126. d = {}
127.
128. if self.state["current-cycle"] is None:
129. d["cycle-in-progress"] = False
130. d["next-crawl-time"] = self.next_wake_time
131. d["remaining-wait-time"] = self.minus_or_none(self.next_wake_time,
132. time.time())
133. else:
134. d["cycle-in-progress"] = True
135. pct = 100.0 * self.last_complete_prefix_index / len(self.prefixes)
136. d["cycle-complete-percentage"] = pct
137. remaining = None
138. if self.last_prefix_elapsed_time is not None:
139. left = len(self.prefixes) - self.last_complete_prefix_index
140. remaining = left * self.last_prefix_elapsed_time
141. # TODO: remainder of this prefix: we need to estimate the
142. # per-bucket time, probably by measuring the time spent on
143. # this prefix so far, divided by the number of buckets we've
144. # processed.
145. d["estimated-cycle-complete-time-left"] = remaining
146. # it's possible to call get_progress() from inside a crawler's
147. # finished_prefix() function
148. d["remaining-sleep-time"] = self.minus_or_none(self.next_wake_time,
149. time.time())
150. per_cycle = None
151. if self.last_cycle_elapsed_time is not None:
152. per_cycle = self.last_cycle_elapsed_time
153. elif self.last_prefix_elapsed_time is not None:
154. per_cycle = len(self.prefixes) * self.last_prefix_elapsed_time
155. d["estimated-time-per-cycle"] = per_cycle
156. return d
157.
158. def get_state(self):
159. """I return the current state of the crawler. This is a copy of my
160. state dictionary.
161.
162. If we are not currently sleeping (i.e. get_state() was called from
163. inside the process_prefixdir, process_bucket, or finished_cycle()
164. methods, or if startService has not yet been called on this crawler),
165. these two keys will be None.
166.
167. Subclasses can override this to add computed keys to the return value,
168. but don't forget to start with the upcall.
169. """
170. state = self.state.copy() # it isn't a deepcopy, so don't go crazy
171. return state
172.
173. def load_state(self):
174. # we use this to store state for both the crawler's internals and
175. # anything the subclass-specific code needs. The state is stored
176. # after each bucket is processed, after each prefixdir is processed,
177. # and after a cycle is complete. The internal keys we use are:
178. # ["version"]: int, always 1
179. # ["last-cycle-finished"]: int, or None if we have not yet finished
180. # any cycle
181. # ["current-cycle"]: int, or None if we are sleeping between cycles
182. # ["current-cycle-start-time"]: int, seconds-since-epoch of when this
183. # cycle was started, possibly by an earlier
184. # process
185. # ["last-complete-prefix"]: str, two-letter name of the last prefixdir
186. # that was fully processed, or None if we
187. # are sleeping between cycles, or if we
188. # have not yet finished any prefixdir since
189. # a cycle was started
190. # ["last-complete-bucket"]: str, base32 storage index bucket name
191. # of the last bucket to be processed, or
192. # None if we are sleeping between cycles
193. try:
194. f = open(self.statefile, "rb")
195. state = pickle.load(f)
196. f.close()
197. except EnvironmentError:
198. state = {"version": 1,
199. "last-cycle-finished": None,
200. "current-cycle": None,
201. "last-complete-prefix": None,
202. "last-complete-bucket": None,
203. }
204. state.setdefault("current-cycle-start-time", time.time()) # approximate
205. self.state = state
206. lcp = state["last-complete-prefix"]
207. if lcp == None:
208. self.last_complete_prefix_index = -1
209. else:
210. self.last_complete_prefix_index = self.prefixes.index(lcp)
211. self.add_initial_state()
212.
213. def add_initial_state(self):
214. """Hook method to add extra keys to self.state when first loaded.
215.
216. The first time this Crawler is used, or when the code has been
217. upgraded, the saved state file may not contain all the keys you
218. expect. Use this method to add any missing keys. Simply modify
219. self.state as needed.
220.
221. This method for subclasses to override. No upcall is necessary.
222. """
223. pass
224.
225. def save_state(self):
226. lcpi = self.last_complete_prefix_index
227. if lcpi == -1:
228. last_complete_prefix = None
229. else:
230. last_complete_prefix = self.prefixes[lcpi]
231. self.state["last-complete-prefix"] = last_complete_prefix
232. tmpfile = self.statefile + ".tmp"
233. f = open(tmpfile, "wb")
234. pickle.dump(self.state, f)
235. f.close()
236. fileutil.move_into_place(tmpfile, self.statefile)
237.
238. def startService(self):
239. # arrange things to look like we were just sleeping, so
240. # status/progress values work correctly
241. self.sleeping_between_cycles = True
242. self.current_sleep_time = self.slow_start
243. self.next_wake_time = time.time() + self.slow_start
244. self.timer = reactor.callLater(self.slow_start, self.start_slice)
245. service.MultiService.startService(self)
246.
247. def stopService(self):
248. if self.timer:
249. self.timer.cancel()
250. self.timer = None
251. self.save_state()
252. return service.MultiService.stopService(self)
253.
254. def start_slice(self):
255. start_slice = time.time()
256. self.timer = None
257. self.sleeping_between_cycles = False
258. self.current_sleep_time = None
259. self.next_wake_time = None
260. try:
261. s = self.last_complete_prefix_index
262. self.start_current_prefix(start_slice)
263. finished_cycle = True
264. except TimeSliceExceeded:
265. finished_cycle = False
266. self.save_state()
267. if not self.running:
268. # someone might have used stopService() to shut us down
269. return
270. # either we finished a whole cycle, or we ran out of time
271. now = time.time()
272. this_slice = now - start_slice
273. # this_slice/(this_slice+sleep_time) = percentage
274. # this_slice/percentage = this_slice+sleep_time
275. # sleep_time = (this_slice/percentage) - this_slice
276. sleep_time = (this_slice / self.allowed_cpu_percentage) - this_slice
277. # if the math gets weird, or a timequake happens, don't sleep
278. # forever. Note that this means that, while a cycle is running, we
279. # will process at least one bucket every 5 minutes, no matter how
280. # long that bucket takes.
281. sleep_time = max(0.0, min(sleep_time, 299))
282. if finished_cycle:
283. # how long should we sleep between cycles? Don't run faster than
284. # allowed_cpu_percentage says, but also run faster than
285. # minimum_cycle_time
286. self.sleeping_between_cycles = True
287. sleep_time = max(sleep_time, self.minimum_cycle_time)
288. else:
289. self.sleeping_between_cycles = False
290. self.current_sleep_time = sleep_time # for status page
291. self.next_wake_time = now + sleep_time
292. self.yielding(sleep_time)
293. self.timer = reactor.callLater(sleep_time, self.start_slice)
294.
295. def start_current_prefix(self, start_slice):
296. state = self.state
297. if state["current-cycle"] is None:
298. self.last_cycle_started_time = time.time()
299. state["current-cycle-start-time"] = self.last_cycle_started_time
300. if state["last-cycle-finished"] is None:
301. state["current-cycle"] = 0
302. else:
303. state["current-cycle"] = state["last-cycle-finished"] + 1
304. self.started_cycle(state["current-cycle"])
305. cycle = state["current-cycle"]
306.
307. for i in range(self.last_complete_prefix_index+1, len(self.prefixes)):
308. # if we want to yield earlier, just raise TimeSliceExceeded()
309. prefix = self.prefixes[i]
310. prefixdir = os.path.join(self.sharedir, prefix)
311. if i == self.bucket_cache[0]:
312. buckets = self.bucket_cache[1]
313. else:
314. try:
315. buckets = os.listdir(prefixdir)
316. buckets.sort()
317. except EnvironmentError:
318. buckets = []
319. self.bucket_cache = (i, buckets)
320. self.process_prefixdir(cycle, prefix, prefixdir,
321. buckets, start_slice)
322. self.last_complete_prefix_index = i
323.
324. now = time.time()
325. if self.last_prefix_finished_time is not None:
326. elapsed = now - self.last_prefix_finished_time
327. self.last_prefix_elapsed_time = elapsed
328. self.last_prefix_finished_time = now
329.
330. self.finished_prefix(cycle, prefix)
331. if time.time() >= start_slice + self.cpu_slice:
332. raise TimeSliceExceeded()
333.
334. # yay! we finished the whole cycle
335. self.last_complete_prefix_index = -1
336. self.last_prefix_finished_time = None # don't include the sleep
337. now = time.time()
338. if self.last_cycle_started_time is not None:
339. self.last_cycle_elapsed_time = now - self.last_cycle_started_time
340. state["last-complete-bucket"] = None
341. state["last-cycle-finished"] = cycle
342. state["current-cycle"] = None
343. self.finished_cycle(cycle)
344. self.save_state()
345.
346. def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
347. """This gets a list of bucket names (i.e. storage index strings,
348. base32-encoded) in sorted order.
349.
350. You can override this if your crawler doesn't care about the actual
351. shares, for example a crawler which merely keeps track of how many
352. buckets are being managed by this server.
353.
354. Subclasses which *do* care about actual bucket should leave this
355. method along, and implement process_bucket() instead.
356. """
357.
358. for bucket in buckets:
359. if bucket <= self.state["last-complete-bucket"]:
360. continue
361. self.process_bucket(cycle, prefix, prefixdir, bucket)
362. self.state["last-complete-bucket"] = bucket
363. if time.time() >= start_slice + self.cpu_slice:
364. raise TimeSliceExceeded()
365.
366. # the remaining methods are explictly for subclasses to implement.
367.
368. def started_cycle(self, cycle):
369. """Notify a subclass that the crawler is about to start a cycle.
370.
371. This method is for subclasses to override. No upcall is necessary.
372. """
373. pass
374.
375. def process_bucket(self, cycle, prefix, prefixdir, storage_index_b32):
376. """Examine a single bucket. Subclasses should do whatever they want
377. to do to the shares therein, then update self.state as necessary.
378.
379. If the crawler is never interrupted by SIGKILL, this method will be
380. called exactly once per share (per cycle). If it *is* interrupted,
381. then the next time the node is started, some amount of work will be
382. duplicated, according to when self.save_state() was last called. By
383. default, save_state() is called at the end of each timeslice, and
384. after finished_cycle() returns, and when stopService() is called.
385.
386. To reduce the chance of duplicate work (i.e. to avoid adding multiple
387. records to a database), you can call save_state() at the end of your
388. process_bucket() method. This will reduce the maximum duplicated work
389. to one bucket per SIGKILL. It will also add overhead, probably 1-20ms
390. per bucket (and some disk writes), which will count against your
391. allowed_cpu_percentage, and which may be considerable if
392. process_bucket() runs quickly.
393.
394. This method is for subclasses to override. No upcall is necessary.
395. """
396. pass
397.
398. def finished_prefix(self, cycle, prefix):
399. """Notify a subclass that the crawler has just finished processing a
400. prefix directory (all buckets with the same two-character/10bit
401. prefix). To impose a limit on how much work might be duplicated by a
402. SIGKILL that occurs during a timeslice, you can call
403. self.save_state() here, but be aware that it may represent a
404. significant performance hit.
405.
406. This method is for subclasses to override. No upcall is necessary.
407. """
408. pass
409.
410. def finished_cycle(self, cycle):
411. """Notify subclass that a cycle (one complete traversal of all
412. prefixdirs) has just finished. 'cycle' is the number of the cycle
413. that just finished. This method should perform summary work and
414. update self.state to publish information to status displays.
415.
416. One-shot crawlers, such as those used to upgrade shares to a new
417. format or populate a database for the first time, can call
418. self.stopService() (or more likely self.disownServiceParent()) to
419. prevent it from running a second time. Don't forget to set some
420. persistent state so that the upgrader won't be run again the next
421. time the node is started.
422.
423. This method is for subclasses to override. No upcall is necessary.
424. """
425. pass
426.
427. def yielding(self, sleep_time):
428. """The crawler is about to sleep for 'sleep_time' seconds. This
429. method is mostly for the convenience of unit tests.
430.
431. This method is for subclasses to override. No upcall is necessary.
432. """
433. pass
434.
435.
436. class BucketCountingCrawler(ShareCrawler):
437. """I keep track of how many buckets are being managed by this server.
438. This is equivalent to the number of distributed files and directories for
439. which I am providing storage. The actual number of files+directories in
440. the full grid is probably higher (especially when there are more servers
441. than 'N', the number of generated shares), because some files+directories
442. will have shares on other servers instead of me. Also note that the
443. number of buckets will differ from the number of shares in small grids,
444. when more than one share is placed on a single server.
445. """
446.
447. minimum_cycle_time = 60*60 # we don't need this more than once an hour
448.
449. def __init__(self, server, statefile, num_sample_prefixes=1):
450. ShareCrawler.__init__(self, server, statefile)
451. self.num_sample_prefixes = num_sample_prefixes
452.
453. def add_initial_state(self):
454. # ["bucket-counts"][cyclenum][prefix] = number
455. # ["last-complete-cycle"] = cyclenum # maintained by base class
456. # ["last-complete-bucket-count"] = number
457. # ["storage-index-samples"][prefix] = (cyclenum,
458. # list of SI strings (base32))
459. self.state.setdefault("bucket-counts", {})
460. self.state.setdefault("last-complete-bucket-count", None)
461. self.state.setdefault("storage-index-samples", {})
462.
463. def process_prefixdir(self, cycle, prefix, prefixdir, buckets, start_slice):
464. # we override process_prefixdir() because we don't want to look at
465. # the individual buckets. We'll save state after each one. On my
466. # laptop, a mostly-empty storage server can process about 70
467. # prefixdirs in a 1.0s slice.
468. if cycle not in self.state["bucket-counts"]:
469. self.state["bucket-counts"][cycle] = {}
470. self.state["bucket-counts"][cycle][prefix] = len(buckets)
471. if prefix in self.prefixes[:self.num_sample_prefixes]:
472. self.state["storage-index-samples"][prefix] = (cycle, buckets)
473.
474. def finished_cycle(self, cycle):
475. last_counts = self.state["bucket-counts"].get(cycle, [])
476. if len(last_counts) == len(self.prefixes):
477. # great, we have a whole cycle.
478. num_buckets = sum(last_counts.values())
479. self.state["last-complete-bucket-count"] = num_buckets
480. # get rid of old counts
481. for old_cycle in list(self.state["bucket-counts"].keys()):
482. if old_cycle != cycle:
483. del self.state["bucket-counts"][old_cycle]
484. # get rid of old samples too
485. for prefix in list(self.state["storage-index-samples"].keys()):
486. old_cycle,buckets = self.state["storage-index-samples"][prefix]
487. if old_cycle != cycle:
488. del self.state["storage-index-samples"][prefix]
489.