1 | # -*- python -*- |
---|
2 | |
---|
3 | |
---|
4 | """ |
---|
5 | Run this tool with twistd in its own directory, with a file named 'urls.txt' |
---|
6 | describing which nodes to query. Make sure to copy diskwatcher.py into the |
---|
7 | same directory. It will request disk-usage numbers from the nodes once per |
---|
8 | hour (or slower), and store them in a local database. It will compute |
---|
9 | usage-per-unit time values over several time ranges and make them available |
---|
10 | through an HTTP query (using ./webport). It will also provide an estimate of |
---|
11 | how much time is left before the grid's storage is exhausted. |
---|
12 | |
---|
13 | There are munin plugins (named tahoe_doomsday and tahoe_diskusage) to graph |
---|
14 | the values this tool computes. |
---|
15 | |
---|
16 | Each line of urls.txt points to a single node. Each node should have its own |
---|
17 | dedicated disk: if multiple nodes share a disk, only list one of them in |
---|
18 | urls.txt (otherwise that space will be double-counted, confusing the |
---|
19 | results). Each line should be in the form: |
---|
20 | |
---|
21 | http://host:webport/statistics?t=json |
---|
22 | |
---|
23 | """ |
---|
24 | |
---|
25 | # TODO: |
---|
26 | # built-in graphs on web interface |
---|
27 | |
---|
28 | |
---|
29 | import os.path, urllib, time |
---|
30 | from datetime import timedelta |
---|
31 | from twisted.application import internet, service, strports |
---|
32 | from twisted.web import server, resource, http, client |
---|
33 | from twisted.internet import defer |
---|
34 | from twisted.python import log |
---|
35 | import json |
---|
36 | from axiom.attributes import AND |
---|
37 | from axiom.store import Store |
---|
38 | from epsilon import extime |
---|
39 | from diskwatcher import Sample |
---|
40 | |
---|
41 | #from axiom.item import Item |
---|
42 | #from axiom.attributes import text, integer, timestamp |
---|
43 | |
---|
44 | #class Sample(Item): |
---|
45 | # url = text() |
---|
46 | # when = timestamp() |
---|
47 | # used = integer() |
---|
48 | # avail = integer() |
---|
49 | |
---|
50 | #s = Store("history.axiom") |
---|
51 | #ns = Store("new-history.axiom") |
---|
52 | #for sa in s.query(Sample): |
---|
53 | # diskwatcher.Sample(store=ns, |
---|
54 | # url=sa.url, when=sa.when, used=sa.used, avail=sa.avail) |
---|
55 | #print "done" |
---|
56 | |
---|
57 | HOUR = 3600 |
---|
58 | DAY = 24*3600 |
---|
59 | WEEK = 7*DAY |
---|
60 | MONTH = 30*DAY |
---|
61 | YEAR = 365*DAY |
---|
62 | |
---|
63 | class DiskWatcher(service.MultiService, resource.Resource): |
---|
64 | POLL_INTERVAL = 1*HOUR |
---|
65 | AVERAGES = {#"60s": 60, |
---|
66 | #"5m": 5*60, |
---|
67 | #"30m": 30*60, |
---|
68 | "1hr": 1*HOUR, |
---|
69 | "1day": 1*DAY, |
---|
70 | "2wk": 2*WEEK, |
---|
71 | "4wk": 4*WEEK, |
---|
72 | } |
---|
73 | |
---|
74 | def __init__(self): |
---|
75 | assert os.path.exists("diskwatcher.tac") # run from the right directory |
---|
76 | self.growth_cache = {} |
---|
77 | service.MultiService.__init__(self) |
---|
78 | resource.Resource.__init__(self) |
---|
79 | self.store = Store("history.axiom") |
---|
80 | self.store.whenFullyUpgraded().addCallback(self._upgrade_complete) |
---|
81 | service.IService(self.store).setServiceParent(self) # let upgrader run |
---|
82 | ts = internet.TimerService(self.POLL_INTERVAL, self.poll) |
---|
83 | ts.setServiceParent(self) |
---|
84 | |
---|
85 | def _upgrade_complete(self, ignored): |
---|
86 | print("Axiom store upgrade complete") |
---|
87 | |
---|
88 | def startService(self): |
---|
89 | service.MultiService.startService(self) |
---|
90 | |
---|
91 | try: |
---|
92 | desired_webport = open("webport", "r").read().strip() |
---|
93 | except EnvironmentError: |
---|
94 | desired_webport = None |
---|
95 | webport = desired_webport or "tcp:0" |
---|
96 | root = self |
---|
97 | serv = strports.service(webport, server.Site(root)) |
---|
98 | serv.setServiceParent(self) |
---|
99 | if not desired_webport: |
---|
100 | got_port = serv._port.getHost().port |
---|
101 | open("webport", "w").write("tcp:%d\n" % got_port) |
---|
102 | |
---|
103 | |
---|
104 | def get_urls(self): |
---|
105 | for url in open("urls.txt","r").readlines(): |
---|
106 | if "#" in url: |
---|
107 | url = url[:url.find("#")] |
---|
108 | url = url.strip() |
---|
109 | if not url: |
---|
110 | continue |
---|
111 | yield url |
---|
112 | |
---|
113 | def poll(self): |
---|
114 | log.msg("polling..") |
---|
115 | #return self.poll_synchronous() |
---|
116 | return self.poll_asynchronous() |
---|
117 | |
---|
118 | def poll_asynchronous(self): |
---|
119 | # this didn't actually seem to work any better than poll_synchronous: |
---|
120 | # logs are more noisy, and I got frequent DNS failures. But with a |
---|
121 | # lot of servers to query, this is probably the better way to go. A |
---|
122 | # significant advantage of this approach is that we can use a |
---|
123 | # timeout= argument to tolerate hanging servers. |
---|
124 | dl = [] |
---|
125 | for url in self.get_urls(): |
---|
126 | when = extime.Time() |
---|
127 | d = client.getPage(url, timeout=60) |
---|
128 | d.addCallback(self.got_response, when, url) |
---|
129 | dl.append(d) |
---|
130 | d = defer.DeferredList(dl) |
---|
131 | def _done(res): |
---|
132 | fetched = len([1 for (success, value) in res if success]) |
---|
133 | log.msg("fetched %d of %d" % (fetched, len(dl))) |
---|
134 | d.addCallback(_done) |
---|
135 | return d |
---|
136 | |
---|
137 | def poll_synchronous(self): |
---|
138 | attempts = 0 |
---|
139 | fetched = 0 |
---|
140 | for url in self.get_urls(): |
---|
141 | attempts += 1 |
---|
142 | try: |
---|
143 | when = extime.Time() |
---|
144 | # if a server accepts the connection and then hangs, this |
---|
145 | # will block forever |
---|
146 | data_json = urllib.urlopen(url).read() |
---|
147 | self.got_response(data_json, when, url) |
---|
148 | fetched += 1 |
---|
149 | except: |
---|
150 | log.msg("error while fetching: %s" % url) |
---|
151 | log.err() |
---|
152 | log.msg("fetched %d of %d" % (fetched, attempts)) |
---|
153 | |
---|
154 | def got_response(self, data_json, when, url): |
---|
155 | data = json.loads(data_json) |
---|
156 | total = data[u"stats"][u"storage_server.disk_total"] |
---|
157 | used = data[u"stats"][u"storage_server.disk_used"] |
---|
158 | avail = data[u"stats"][u"storage_server.disk_avail"] |
---|
159 | print("%s : total=%s, used=%s, avail=%s" % (url, |
---|
160 | total, used, avail)) |
---|
161 | Sample(store=self.store, |
---|
162 | url=unicode(url), when=when, total=total, used=used, avail=avail) |
---|
163 | |
---|
164 | def calculate_growth_timeleft(self): |
---|
165 | timespans = [] |
---|
166 | total_avail_space = self.find_total_available_space() |
---|
167 | pairs = [ (timespan,name) |
---|
168 | for name,timespan in self.AVERAGES.items() ] |
---|
169 | pairs.sort() |
---|
170 | for (timespan,name) in pairs: |
---|
171 | growth = self.growth(timespan) |
---|
172 | print(name, total_avail_space, growth) |
---|
173 | if growth is not None: |
---|
174 | timeleft = None |
---|
175 | if growth > 0: |
---|
176 | timeleft = total_avail_space / growth |
---|
177 | timespans.append( (name, timespan, growth, timeleft) ) |
---|
178 | return timespans |
---|
179 | |
---|
180 | def find_total_space(self): |
---|
181 | # this returns the sum of disk-avail stats for all servers that 1) |
---|
182 | # are listed in urls.txt and 2) have responded recently. |
---|
183 | now = extime.Time() |
---|
184 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
---|
185 | total_space = 0 |
---|
186 | for url in self.get_urls(): |
---|
187 | url = unicode(url) |
---|
188 | latest = list(self.store.query(Sample, |
---|
189 | AND(Sample.url == url, |
---|
190 | Sample.when > recent), |
---|
191 | sort=Sample.when.descending, |
---|
192 | limit=1)) |
---|
193 | if latest: |
---|
194 | total_space += latest[0].total |
---|
195 | return total_space |
---|
196 | |
---|
197 | def find_total_available_space(self): |
---|
198 | # this returns the sum of disk-avail stats for all servers that 1) |
---|
199 | # are listed in urls.txt and 2) have responded recently. |
---|
200 | now = extime.Time() |
---|
201 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
---|
202 | total_avail_space = 0 |
---|
203 | for url in self.get_urls(): |
---|
204 | url = unicode(url) |
---|
205 | latest = list(self.store.query(Sample, |
---|
206 | AND(Sample.url == url, |
---|
207 | Sample.when > recent), |
---|
208 | sort=Sample.when.descending, |
---|
209 | limit=1)) |
---|
210 | if latest: |
---|
211 | total_avail_space += latest[0].avail |
---|
212 | return total_avail_space |
---|
213 | |
---|
214 | def find_total_used_space(self): |
---|
215 | # this returns the sum of disk-used stats for all servers that 1) are |
---|
216 | # listed in urls.txt and 2) have responded recently. |
---|
217 | now = extime.Time() |
---|
218 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
---|
219 | total_used_space = 0 |
---|
220 | for url in self.get_urls(): |
---|
221 | url = unicode(url) |
---|
222 | latest = list(self.store.query(Sample, |
---|
223 | AND(Sample.url == url, |
---|
224 | Sample.when > recent), |
---|
225 | sort=Sample.when.descending, |
---|
226 | limit=1)) |
---|
227 | if latest: |
---|
228 | total_used_space += latest[0].used |
---|
229 | return total_used_space |
---|
230 | |
---|
231 | |
---|
232 | def growth(self, timespan): |
---|
233 | """Calculate the bytes-per-second growth of the total disk-used stat, |
---|
234 | over a period of TIMESPAN seconds (i.e. between the most recent |
---|
235 | sample and the latest one that's at least TIMESPAN seconds ago), |
---|
236 | summed over all nodes which 1) are listed in urls.txt, 2) have |
---|
237 | responded recently, and 3) have a response at least as old as |
---|
238 | TIMESPAN. If there are no nodes which meet these criteria, we'll |
---|
239 | return None; this is likely to happen for the longer timespans (4wk) |
---|
240 | until the gatherer has been running and collecting data for that |
---|
241 | long.""" |
---|
242 | |
---|
243 | # a note about workload: for our oldest storage servers, as of |
---|
244 | # 25-Jan-2009, the first DB query here takes about 40ms per server |
---|
245 | # URL (some take as little as 10ms). There are about 110 servers, and |
---|
246 | # two queries each, so the growth() function takes about 7s to run |
---|
247 | # for each timespan. We track 4 timespans, and find_total_*_space() |
---|
248 | # takes about 2.3s to run, so calculate_growth_timeleft() takes about |
---|
249 | # 27s. Each HTTP query thus takes 27s, and we have six munin plugins |
---|
250 | # which perform HTTP queries every 5 minutes. By adding growth_cache(), |
---|
251 | # I hope to reduce this: the first HTTP query will still take 27s, |
---|
252 | # but the subsequent five should be about 2.3s each. |
---|
253 | |
---|
254 | # we're allowed to cache this value for 3 minutes |
---|
255 | if timespan in self.growth_cache: |
---|
256 | (when, value) = self.growth_cache[timespan] |
---|
257 | if time.time() - when < 3*60: |
---|
258 | return value |
---|
259 | |
---|
260 | td = timedelta(seconds=timespan) |
---|
261 | now = extime.Time() |
---|
262 | then = now - td |
---|
263 | recent = now - timedelta(seconds=2*self.POLL_INTERVAL) |
---|
264 | |
---|
265 | total_growth = 0.0 |
---|
266 | num_nodes = 0 |
---|
267 | |
---|
268 | for url in self.get_urls(): |
---|
269 | url = unicode(url) |
---|
270 | latest = list(self.store.query(Sample, |
---|
271 | AND(Sample.url == url, |
---|
272 | Sample.when > recent), |
---|
273 | sort=Sample.when.descending, |
---|
274 | limit=1)) |
---|
275 | if not latest: |
---|
276 | #print "no latest sample from", url |
---|
277 | continue # skip this node |
---|
278 | latest = latest[0] |
---|
279 | old = list(self.store.query(Sample, |
---|
280 | AND(Sample.url == url, |
---|
281 | Sample.when < then), |
---|
282 | sort=Sample.when.descending, |
---|
283 | limit=1)) |
---|
284 | if not old: |
---|
285 | #print "no old sample from", url |
---|
286 | continue # skip this node |
---|
287 | old = old[0] |
---|
288 | duration = latest.when.asPOSIXTimestamp() - old.when.asPOSIXTimestamp() |
---|
289 | if not duration: |
---|
290 | print("only one sample from", url) |
---|
291 | continue |
---|
292 | |
---|
293 | rate = float(latest.used - old.used) / duration |
---|
294 | #print url, rate |
---|
295 | total_growth += rate |
---|
296 | num_nodes += 1 |
---|
297 | |
---|
298 | if not num_nodes: |
---|
299 | return None |
---|
300 | self.growth_cache[timespan] = (time.time(), total_growth) |
---|
301 | return total_growth |
---|
302 | |
---|
303 | def getChild(self, path, req): |
---|
304 | if path == "": |
---|
305 | return self |
---|
306 | return resource.Resource.getChild(self, path, req) |
---|
307 | |
---|
308 | def abbreviate_time(self, s): |
---|
309 | def _plural(count, unit): |
---|
310 | count = int(count) |
---|
311 | if count == 1: |
---|
312 | return "%d %s" % (count, unit) |
---|
313 | return "%d %ss" % (count, unit) |
---|
314 | if s is None: |
---|
315 | return "unknown" |
---|
316 | if s < 120: |
---|
317 | return _plural(s, "second") |
---|
318 | if s < 3*HOUR: |
---|
319 | return _plural(s/60, "minute") |
---|
320 | if s < 2*DAY: |
---|
321 | return _plural(s/HOUR, "hour") |
---|
322 | if s < 2*MONTH: |
---|
323 | return _plural(s/DAY, "day") |
---|
324 | if s < 4*YEAR: |
---|
325 | return _plural(s/MONTH, "month") |
---|
326 | return _plural(s/YEAR, "year") |
---|
327 | |
---|
328 | def abbreviate_space2(self, s, SI=True): |
---|
329 | if s is None: |
---|
330 | return "unknown" |
---|
331 | if SI: |
---|
332 | U = 1000.0 |
---|
333 | isuffix = "B" |
---|
334 | else: |
---|
335 | U = 1024.0 |
---|
336 | isuffix = "iB" |
---|
337 | def r(count, suffix): |
---|
338 | return "%.2f %s%s" % (count, suffix, isuffix) |
---|
339 | |
---|
340 | if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode |
---|
341 | return r(s, "") |
---|
342 | if s < U*U: |
---|
343 | return r(s/U, "k") |
---|
344 | if s < U*U*U: |
---|
345 | return r(s/(U*U), "M") |
---|
346 | if s < U*U*U*U: |
---|
347 | return r(s/(U*U*U), "G") |
---|
348 | if s < U*U*U*U*U: |
---|
349 | return r(s/(U*U*U*U), "T") |
---|
350 | return r(s/(U*U*U*U*U), "P") |
---|
351 | |
---|
352 | def abbreviate_space(self, s): |
---|
353 | return "(%s, %s)" % (self.abbreviate_space2(s, True), |
---|
354 | self.abbreviate_space2(s, False)) |
---|
355 | |
---|
356 | def render(self, req): |
---|
357 | t = req.args.get("t", ["html"])[0] |
---|
358 | ctype = "text/plain" |
---|
359 | data = "" |
---|
360 | if t == "html": |
---|
361 | data = "" |
---|
362 | for (name, timespan, growth, timeleft) in self.calculate_growth_timeleft(): |
---|
363 | data += "%f bytes per second (%sps), %s remaining (over %s)\n" % \ |
---|
364 | (growth, self.abbreviate_space2(growth, True), |
---|
365 | self.abbreviate_time(timeleft), name) |
---|
366 | used = self.find_total_used_space() |
---|
367 | data += "total used: %d bytes %s\n" % (used, |
---|
368 | self.abbreviate_space(used)) |
---|
369 | total = self.find_total_space() |
---|
370 | data += "total space: %d bytes %s\n" % (total, |
---|
371 | self.abbreviate_space(total)) |
---|
372 | elif t == "json": |
---|
373 | current = {"rates": self.calculate_growth_timeleft(), |
---|
374 | "total": self.find_total_space(), |
---|
375 | "used": self.find_total_used_space(), |
---|
376 | "available": self.find_total_available_space(), |
---|
377 | } |
---|
378 | data = json.dumps(current, indent=True) |
---|
379 | else: |
---|
380 | req.setResponseCode(http.BAD_REQUEST) |
---|
381 | data = "Unknown t= %s\n" % t |
---|
382 | req.setHeader("content-type", ctype) |
---|
383 | return data |
---|
384 | |
---|
385 | application = service.Application("disk-watcher") |
---|
386 | DiskWatcher().setServiceParent(application) |
---|