source: trunk/misc/operations_helpers/cpu-watcher.tac

Last change on this file was b856238, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-15T15:53:34Z

remove old Python2 future statements

  • Property mode set to 100644
File size: 8.3 KB
Line 
1# -*- python -*-
2
3
4"""
5# run this tool on a linux box in its own directory, with a file named
6# 'pids.txt' describing which processes to watch. It will follow CPU usage of
7# the given processes, and compute 1/5/15-minute moving averages for each
8# process. These averages can be retrieved from a foolscap connection
9# (published at ./watcher.furl), or through an HTTP query (using ./webport).
10
11# Each line of pids.txt describes a single process. Blank lines and ones that
12# begin with '#' are ignored. Each line is either "PID" or "PID NAME" (space
13# separated). PID is either a numeric process ID, a pathname to a file that
14# contains a process ID, or a pathname to a directory that contains a
15# twistd.pid file (which contains a process ID). NAME is an arbitrary string
16# that will be used to describe the process to watcher.furl subscribers, and
17# defaults to PID if not provided.
18"""
19
20# TODO:
21#  built-in graphs on web interface
22
23
24import pickle, os.path, time, pprint
25from twisted.application import internet, service, strports
26from twisted.web import server, resource, http
27from twisted.python import log
28import json
29from foolscap import Tub, Referenceable, RemoteInterface, eventual
30from foolscap.schema import ListOf, TupleOf
31from zope.interface import implements
32
33def read_cpu_times(pid):
34    data = open("/proc/%d/stat" % pid, "r").read()
35    data = data.split()
36    times = data[13:17]
37    # the values in /proc/%d/stat are in ticks, I think. My system has
38    # CONFIG_HZ_1000=y in /proc/config.gz but nevertheless the numbers in
39    # 'stat' appear to be 10ms each.
40    HZ = 100
41    userspace_seconds = int(times[0]) * 1.0 / HZ
42    system_seconds = int(times[1]) * 1.0 / HZ
43    child_userspace_seconds = int(times[2]) * 1.0 / HZ
44    child_system_seconds = int(times[3]) * 1.0 / HZ
45    return (userspace_seconds, system_seconds)
46
47
48def read_pids_txt():
49    processes = []
50    for line in open("pids.txt", "r").readlines():
51        line = line.strip()
52        if not line or line[0] == "#":
53            continue
54        parts = line.split()
55        pidthing = parts[0]
56        if len(parts) > 1:
57            name = parts[1]
58        else:
59            name = pidthing
60        pid = None
61        try:
62            pid = int(pidthing)
63        except ValueError:
64            pidfile = os.path.expanduser(pidthing)
65            if os.path.isdir(pidfile):
66                pidfile = os.path.join(pidfile, "twistd.pid")
67            try:
68                pid = int(open(pidfile, "r").read().strip())
69            except EnvironmentError:
70                pass
71        if pid is not None:
72            processes.append( (pid, name) )
73    return processes
74
75Averages = ListOf( TupleOf(str, float, float, float) )
76class RICPUWatcherSubscriber(RemoteInterface):
77    def averages(averages=Averages):
78        return None
79
80class RICPUWatcher(RemoteInterface):
81    def get_averages():
82        """Return a list of rows, one for each process I am watching. Each
83        row is (name, 1-min-avg, 5-min-avg, 15-min-avg), where 'name' is a
84        string, and the averages are floats from 0.0 to 1.0 . Each average is
85        the percentage of the CPU that this process has used: the change in
86        CPU time divided by the change in wallclock time.
87        """
88        return Averages
89
90    def subscribe(observer=RICPUWatcherSubscriber):
91        """Arrange for the given observer to get an 'averages' message every
92        time the averages are updated. This message will contain a single
93        argument, the same list of tuples that get_averages() returns."""
94        return None
95
96class CPUWatcher(service.MultiService, resource.Resource, Referenceable):
97    implements(RICPUWatcher)
98    POLL_INTERVAL = 30 # seconds
99    HISTORY_LIMIT = 15 * 60 # 15min
100    AVERAGES = (1*60, 5*60, 15*60) # 1min, 5min, 15min
101
102    def __init__(self):
103        service.MultiService.__init__(self)
104        resource.Resource.__init__(self)
105        try:
106            self.history = pickle.load(open("history.pickle", "rb"))
107        except:
108            self.history = {}
109        self.current = []
110        self.observers = set()
111        ts = internet.TimerService(self.POLL_INTERVAL, self.poll)
112        ts.setServiceParent(self)
113
114    def startService(self):
115        service.MultiService.startService(self)
116
117        try:
118            desired_webport = open("webport", "r").read().strip()
119        except EnvironmentError:
120            desired_webport = None
121        webport = desired_webport or "tcp:0"
122        root = self
123        serv = strports.service(webport, server.Site(root))
124        serv.setServiceParent(self)
125        if not desired_webport:
126            got_port = serv._port.getHost().port
127            open("webport", "w").write("tcp:%d\n" % got_port)
128
129        self.tub = Tub(certFile="watcher.pem")
130        self.tub.setServiceParent(self)
131        try:
132            desired_tubport = open("tubport", "r").read().strip()
133        except EnvironmentError:
134            desired_tubport = None
135        tubport = desired_tubport or "tcp:0"
136        l = self.tub.listenOn(tubport)
137        if not desired_tubport:
138            got_port = l.getPortnum()
139            open("tubport", "w").write("tcp:%d\n" % got_port)
140        d = self.tub.setLocationAutomatically()
141        d.addCallback(self._tub_ready)
142        d.addErrback(log.err)
143
144    def _tub_ready(self, res):
145        self.tub.registerReference(self, furlFile="watcher.furl")
146
147
148    def getChild(self, path, req):
149        if path == "":
150            return self
151        return resource.Resource.getChild(self, path, req)
152
153    def render(self, req):
154        t = req.args.get("t", ["html"])[0]
155        ctype = "text/plain"
156        data = ""
157        if t == "html":
158            data = "# name, 1min, 5min, 15min\n"
159            data += pprint.pformat(self.current) + "\n"
160        elif t == "json":
161            #data = str(self.current) + "\n" # isn't that convenient? almost.
162            data = json.dumps(self.current, indent=True)
163        else:
164            req.setResponseCode(http.BAD_REQUEST)
165            data = "Unknown t= %s\n" % t
166        req.setHeader("content-type", ctype)
167        return data
168
169    def remote_get_averages(self):
170        return self.current
171    def remote_subscribe(self, observer):
172        self.observers.add(observer)
173
174    def notify(self, observer):
175        d = observer.callRemote("averages", self.current)
176        def _error(f):
177            log.msg("observer error, removing them")
178            log.msg(f)
179            self.observers.discard(observer)
180        d.addErrback(_error)
181
182    def poll(self):
183        max_history = self.HISTORY_LIMIT / self.POLL_INTERVAL
184        current = []
185        try:
186            processes = read_pids_txt()
187        except:
188            log.err()
189            return
190        for (pid, name) in processes:
191            if pid not in self.history:
192                self.history[pid] = []
193            now = time.time()
194            try:
195                (user_seconds, sys_seconds) = read_cpu_times(pid)
196                self.history[pid].append( (now, user_seconds, sys_seconds) )
197                while len(self.history[pid]) > max_history+1:
198                    self.history[pid].pop(0)
199            except:
200                log.msg("error reading process %s (%s), ignoring" % (pid, name))
201                log.err()
202        try:
203            # Newer protocols won't work in Python 2; when it is dropped,
204            # protocol v4 can be used (added in Python 3.4).
205            pickle.dump(self.history, open("history.pickle.tmp", "wb"), protocol=2)
206            os.rename("history.pickle.tmp", "history.pickle")
207        except:
208            pass
209        for (pid, name) in processes:
210            row = [name]
211            for avg in self.AVERAGES:
212                row.append(self._average_N(pid, avg))
213            current.append(tuple(row))
214        self.current = current
215        print(current)
216        for ob in self.observers:
217            eventual.eventually(self.notify, ob)
218
219    def _average_N(self, pid, seconds):
220        num_samples = seconds / self.POLL_INTERVAL
221        samples = self.history[pid]
222        if len(samples) < num_samples+1:
223            return None
224        first = -num_samples-1
225        elapsed_wall = samples[-1][0] - samples[first][0]
226        elapsed_user = samples[-1][1] - samples[first][1]
227        elapsed_sys = samples[-1][2] - samples[first][2]
228        if elapsed_wall == 0.0:
229            return 0.0
230        return (elapsed_user+elapsed_sys) / elapsed_wall
231
232application = service.Application("cpu-watcher")
233CPUWatcher().setServiceParent(application)
Note: See TracBrowser for help on using the repository browser.