Ticket #302: ringsim.2.py

File ringsim.2.py, 10.7 KB (added by warner, at 2009-12-26T05:59:43Z)

Brian's v2 simulator, prints nodeid gaps and min/max nodeid

Line 
1#! /usr/bin/python
2
3import time
4import random, math
5from hashlib import sha1, md5, sha256
6sha1 = md5
7# md5: 1520 "uploads" per second
8# sha1: 1350 ups
9# sha256: 930 ups
10from itertools import count
11from twisted.python import usage
12
13def abbreviate_space(s, SI=True):
14    if s is None:
15        return "unknown"
16    if SI:
17        U = 1000.0
18        isuffix = "B"
19    else:
20        U = 1024.0
21        isuffix = "iB"
22    def r(count, suffix):
23        return "%.2f %s%s" % (count, suffix, isuffix)
24
25    if s < 1024: # 1000-1023 get emitted as bytes, even in SI mode
26        return "%d B" % s
27    if s < U*U:
28        return r(s/U, "k")
29    if s < U*U*U:
30        return r(s/(U*U), "M")
31    if s < U*U*U*U:
32        return r(s/(U*U*U), "G")
33    if s < U*U*U*U*U:
34        return r(s/(U*U*U*U), "T")
35    return r(s/(U*U*U*U*U), "P")
36
37def make_up_a_file_size(max=2**31):
38    #return (2 ** random.randrange(8, 31)) # avg=??
39    return random.randrange(max) # avg 1GB
40sizes = [make_up_a_file_size() for i in range(10000)]
41avg_filesize = sum(sizes)/len(sizes)
42print "average file size:", abbreviate_space(avg_filesize)
43
44SERVER_CAPACITY = 10**12 * 1000
45
46class Server:
47    def __init__(self, nodeid, capacity):
48        self.nodeid = nodeid
49        self.used = 0
50        self.capacity = capacity
51        self.numshares = 0
52        self.full_at_tick = None
53
54    def upload(self, sharesize):
55        if self.used + sharesize < self.capacity:
56            self.used += sharesize
57            self.numshares += 1
58            return True
59        return False
60
61    def __repr__(self):
62        if self.full_at_tick is not None:
63            return "<%s %s full at %d>" % (self.__class__.__name__, self.nodeid, self.full_at_tick)
64        else:
65            return "<%s %s>" % (self.__class__.__name__, self.nodeid)
66
67class Ring:
68    def __init__(self, numservers, seed, permute):
69        self.servers = []
70        for i in range(numservers):
71            nodeid = sha1(str(seed)+str(i)).hexdigest()
72            capacity = SERVER_CAPACITY
73            s = Server(nodeid, capacity)
74            self.servers.append(s)
75        self.servers.sort(key=lambda s: s.nodeid)
76        for i in range(len(self.servers)):
77            s = self.servers[i]
78            next_s = self.servers[(i+1)%len(self.servers)]
79            diff = "%032x" % (int(next_s.nodeid,16) - int(s.nodeid,16))
80            s.next_diff = diff
81            prev_s = self.servers[(i-1)%len(self.servers)]
82            diff = "%032x" % (int(s.nodeid,16) - int(prev_s.nodeid,16))
83            s.prev_diff = diff
84            print s, s.prev_diff
85
86        print "sorted by delta"
87        for s in sorted(self.servers, key=lambda s:s.prev_diff):
88            print s, s.prev_diff
89        self.permute = permute
90
91    def servers_for_si(self, si):
92        if self.permute:
93            def sortkey(s):
94                return sha1(s.nodeid+si).digest()
95            return sorted(self.servers, key=sortkey)
96        for i in range(len(self.servers)):
97            if self.servers[i].nodeid >= si:
98                return self.servers[i:] + self.servers[:i]
99        return list(self.servers)
100
101    def show_servers(self, picked):
102        bits = []
103        for s in self.servers:
104            if s in picked:
105                bits.append("1")
106            else:
107                bits.append("0")
108        #d = [s in picked and "1" or "0" for s in self.servers]
109        return "".join(bits)
110
111    def dump_usage(self, numfiles, avg_space_per_file):
112        print "uploaded", numfiles
113        # avg_space_per_file measures expected grid-wide ciphertext per file
114        used = list(reversed(sorted([s.used for s in self.servers])))
115        # used is actual per-server ciphertext
116        usedpf = [1.0*u/numfiles for u in used]
117        # usedpf is actual per-server-per-file ciphertext
118        #print "min/max usage: %s/%s" % (abbreviate_space(used[-1]),
119        #                                abbreviate_space(used[0]))
120        avg_usage_per_file = avg_space_per_file/len(self.servers)
121        # avg_usage_per_file is expected per-server-per-file ciphertext
122        spreadpf = usedpf[0] - usedpf[-1]
123        average_usagepf = sum(usedpf) / len(usedpf)
124        variance = sum([(u-average_usagepf)**2 for u in usedpf])/(len(usedpf)-1)
125        std_deviation = math.sqrt(variance)
126        sd_of_total = std_deviation / avg_usage_per_file
127
128        print "min/max/(exp) usage-pf-ps %s/%s/(%s):" % (
129            abbreviate_space(usedpf[-1]),
130            abbreviate_space(usedpf[0]),
131            abbreviate_space(avg_usage_per_file) ),
132        print "spread-pf: %s (%.2f%%)" % (
133            abbreviate_space(spreadpf), 100.0*spreadpf/avg_usage_per_file),
134        #print "average_usage:", abbreviate_space(average_usagepf)
135        print "stddev: %s (%.2f%%)" % (abbreviate_space(std_deviation),
136                                       100.0*sd_of_total)
137        s2 = sorted(self.servers, key=lambda s: s.used)
138        print "least:", s2[0].nodeid
139        print "most:", s2[-1].nodeid
140
141
142class Options(usage.Options):
143    optParameters = [
144        ("k", "k", 3, "required shares", int),
145        ("N", "N", 10, "total shares", int),
146        ("servers", None, 100, "number of servers", int),
147        ("seed", None, None, "seed to use for creating ring"),
148        ("permute", "p", 1, "1 to permute, 0 to use flat ring", int),
149        ]
150    def postOptions(self):
151        assert self["seed"]
152
153
154def do_run(ring, opts):
155    avg_space_per_file = avg_filesize * opts["N"] / opts["k"]
156    start = time.time()
157    for filenum in count(0):
158        #used = list(reversed(sorted([s.used for s in ring.servers])))
159        #used = [s.used for s in ring.servers]
160        #print used
161        filesize = make_up_a_file_size()
162        sharesize = filesize / opts["k"]
163        si = sha1(str(random.randrange(2**40))).hexdigest()
164        if filenum%4000==0 and filenum > 1:
165            ring.dump_usage(filenum, avg_space_per_file)
166        servers = ring.servers_for_si(si)
167        #print ring.show_servers(servers[:opts["N"]])
168        remaining_shares = opts["N"]
169        index = 0
170        while remaining_shares:
171            s = servers[index]
172            accepted = s.upload(sharesize)
173            if not accepted:
174                return filenum # number of files successfully uploaded
175            remaining_shares -= 1
176            index += 1
177
178
179def do_ring(opts):
180    #seed = str(random.randrange(2**31))
181    total_capacity = opts["servers"]*SERVER_CAPACITY
182    avg_space_per_file = avg_filesize * opts["N"] / opts["k"]
183    avg_files = total_capacity / avg_space_per_file
184    print "expected number of uploads:", avg_files
185    if opts["permute"]:
186        print " PERMUTED"
187    else:
188        print " LINEAR"
189    seed = opts["seed"]
190
191    ring = Ring(opts["servers"], seed, opts["permute"])
192    num_files = do_run(ring, opts)
193
194def run(opts):
195    do_ring(opts)
196
197if __name__ == "__main__":
198    opts = Options()
199    opts.parseOptions()
200    run(opts)
201
202
203def go(opts):
204    servers = [ Server() for x in range(SERVERS) ]
205    servers.sort(cmp=lambda x,y: cmp(x.si, y.si))
206
207    doubled_up_shares = 0
208    doubled_up_at_tick = None
209    tick = 0
210    fullservers = 0
211    while True:
212        nextsharesize = make_up_a_file_size() / K
213        if permutedpeerlist:
214            random.shuffle(servers)
215        else:
216            # rotate a random number
217            rot = random.randrange(0, len(servers))
218            servers = servers[rot:] + servers[:rot]
219
220        i = 0
221        wrapped = False
222        sharestoput = N
223        while sharestoput:
224            server = servers[i]
225            if server.used + nextsharesize < server.max:
226                server.used += nextsharesize
227                sharestoput -= 1
228                if wrapped:
229                    doubled_up_shares += 1
230                    if doubled_up_at_tick is None:
231                        doubled_up_at_tick = tick
232            else:
233                if server.full_at_tick is None:
234                    server.full_at_tick = tick
235                    fullservers += 1
236                    if fullservers == len(servers):
237                        # print "Couldn't place share -- all servers full.  Stopping."
238                        return (servers, doubled_up_shares, doubled_up_at_tick)
239
240            i += 1
241            if i == len(servers):
242                wrapped = True
243                i = 0
244
245        tick += 1
246
247def div_ceil(n, d):
248    """
249    The smallest integer k such that k*d >= n.
250    """
251    return (n/d) + (n%d != 0)
252
253DESIRED_COLUMNS = 70
254
255START_FILES = 137000
256STOP_FILES = 144000
257
258def test(permutedpeerlist, iters):
259    # The i'th element of the filledat list is how many servers got full when the i'th file was uploaded.
260    filledat = []
261    for test in range(iters):
262        (servers, doubled_up_shares, doubled_up_at_tick) = go(permutedpeerlist)
263        print "doubled_up_shares: ", doubled_up_shares
264        print "doubled_up_at_tick: ", doubled_up_at_tick
265        #full_at = [server.full_at_tick for server in servers]
266        #full_at.sort()
267        #print full_at
268        #return
269        for server in servers:
270            fidx = server.full_at_tick
271            filledat.extend([0]*(fidx-len(filledat)+1))
272            filledat[fidx] += 1
273
274    startfiles = 0
275    while filledat[startfiles] == 0:
276        startfiles += 1
277    filespercolumn = div_ceil(len(filledat) - startfiles, (DESIRED_COLUMNS - 3))
278
279    # to make comparisons between runs line up:
280    # startfiles = START_FILES
281    # filespercolumn = div_ceil(STOP_FILES - startfiles, (DESIRED_COLUMNS - 3))
282
283    # The i'th element of the compressedfilledat list is how many servers got full when the filespercolumn files starting at startfiles + i were uploaded.
284    compressedfilledat = []
285    idx = startfiles
286    while idx < len(filledat):
287        compressedfilledat.append(0)
288        for i in range(filespercolumn):
289            compressedfilledat[-1] += filledat[idx]
290            idx += 1
291            if idx >= len(filledat):
292                break
293
294    # The i'th element of the fullat list is how many servers were full by the tick numbered startfiles + i * filespercolumn (on average).
295    fullat = [0] * len(compressedfilledat)
296    for idx, num in enumerate(compressedfilledat):
297        for fidx in range(idx, len(fullat)):
298            fullat[fidx] += num
299
300    for idx in range(len(fullat)):
301        fullat[idx]  = fullat[idx] / float(iters)
302
303    # Now print it out as an ascii art graph.
304    import sys
305    for serversfull in range(40, 0, -1):
306        sys.stdout.write("%2d " % serversfull)
307        for numfull in fullat:
308            if int(numfull) == serversfull:
309                sys.stdout.write("*")
310            else:
311                sys.stdout.write(" ")
312        sys.stdout.write("\n")
313
314    sys.stdout.write(" ^-- servers full\n")
315    idx = 0
316    while idx < len(fullat):
317        nextmark  = "%d--^ " % (startfiles + idx * filespercolumn)
318        sys.stdout.write(nextmark)
319        idx += len(nextmark)
320
321    sys.stdout.write("\nfiles uploaded --> \n")
322
323