1 | """ |
2 | I contain utilities useful for calculating servers_of_happiness, and for |
3 | reporting it in messages. |
4 | |
5 | Ported to Python 3. |
6 | """ |
7 | |
8 | from copy import deepcopy |
9 | from allmydata.immutable.happiness_upload import residual_network |
10 | from allmydata.immutable.happiness_upload import augmenting_path_for |
11 | |
12 | |
13 | def failure_message(peer_count, k, happy, effective_happy): |
14 | # If peer_count < needed_shares, this error message makes more |
15 | # sense than any of the others, so use it. |
16 | if peer_count < k: |
17 | msg = ("shares could be placed or found on only %d " |
18 | "server(s). " |
19 | "We were asked to place shares on at least %d " |
20 | "server(s) such that any %d of them have " |
21 | "enough shares to recover the file." % |
22 | (peer_count, happy, k)) |
23 | # Otherwise, if we've placed on at least needed_shares |
24 | # peers, but there isn't an x-happy subset of those peers |
25 | # for x >= needed_shares, we use this error message. |
26 | elif effective_happy < k: |
27 | msg = ("shares could be placed or found on %d " |
28 | "server(s), but they are not spread out evenly " |
29 | "enough to ensure that any %d of these servers " |
30 | "would have enough shares to recover the file. " |
31 | "We were asked to place " |
32 | "shares on at least %d servers such that any " |
33 | "%d of them have enough shares to recover the " |
34 | "file." % |
35 | (peer_count, k, happy, k)) |
36 | # Otherwise, if there is an x-happy subset of peers where |
37 | # x >= needed_shares, but x < servers_of_happiness, then |
38 | # we use this message. |
39 | else: |
40 | msg = ("shares could be placed on only %d server(s) " |
41 | "such that any %d of them have enough shares " |
42 | "to recover the file, but we were asked to " |
43 | "place shares on at least %d such servers." % |
44 | (effective_happy, k, happy)) |
45 | return msg |
46 | |
47 | |
48 | def shares_by_server(servermap): |
49 | """ |
50 | I accept a dict of shareid -> set(peerid) mappings, and return a |
51 | dict of peerid -> set(shareid) mappings. My argument is a dictionary |
52 | with sets of peers, indexed by shares, and I transform that into a |
53 | dictionary of sets of shares, indexed by peerids. |
54 | """ |
55 | ret = {} |
56 | for shareid, peers in servermap.items(): |
57 | assert isinstance(peers, set) |
58 | for peerid in peers: |
59 | ret.setdefault(peerid, set()).add(shareid) |
60 | return ret |
61 | |
62 | def merge_servers(servermap, upload_trackers=None): |
63 | """ |
64 | I accept a dict of shareid -> set(serverid) mappings, and optionally a |
65 | set of ServerTrackers. If no set of ServerTrackers is provided, I return |
66 | my first argument unmodified. Otherwise, I update a copy of my first |
67 | argument to include the shareid -> serverid mappings implied in the |
68 | set of ServerTrackers, returning the resulting dict. |
69 | """ |
70 | # Since we mutate servermap, and are called outside of a |
71 | # context where it is okay to do that, make a copy of servermap and |
72 | # work with it. |
73 | servermap = deepcopy(servermap) |
74 | if not upload_trackers: |
75 | return servermap |
76 | |
77 | assert(isinstance(servermap, dict)) |
78 | assert(isinstance(upload_trackers, set)) |
79 | |
80 | for tracker in upload_trackers: |
81 | for shnum in tracker.buckets: |
82 | servermap.setdefault(shnum, set()).add(tracker.get_serverid()) |
83 | return servermap |
84 | |
85 | |
86 | def servers_of_happiness(sharemap): |
87 | """ |
88 | I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I |
89 | return the 'servers_of_happiness' number that sharemap results in. |
90 | |
91 | To calculate the 'servers_of_happiness' number for the sharemap, I |
92 | construct a bipartite graph with servers in one partition of vertices |
93 | and shares in the other, and with an edge between a server s and a share t |
94 | if s is to store t. I then compute the size of a maximum matching in |
95 | the resulting graph; this is then returned as the 'servers_of_happiness' |
96 | for my arguments. |
97 | |
98 | For example, consider the following layout: |
99 | |
100 | server 1: shares 1, 2, 3, 4 |
101 | server 2: share 6 |
102 | server 3: share 3 |
103 | server 4: share 4 |
104 | server 5: share 2 |
105 | |
106 | From this, we can construct the following graph: |
107 | |
108 | L = {server 1, server 2, server 3, server 4, server 5} |
109 | R = {share 1, share 2, share 3, share 4, share 6} |
110 | V = L U R |
111 | E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), |
112 | (server 1, share 4), (server 2, share 6), (server 3, share 3), |
113 | (server 4, share 4), (server 5, share 2)} |
114 | G = (V, E) |
115 | |
116 | Note that G is bipartite since every edge in e has one endpoint in L |
117 | and one endpoint in R. |
118 | |
119 | A matching in a graph G is a subset M of E such that, for any vertex |
120 | v in V, v is incident to at most one edge of M. A maximum matching |
121 | in G is a matching that is no smaller than any other matching. For |
122 | this graph, a matching of cardinality 5 is: |
123 | |
124 | M = {(server 1, share 1), (server 2, share 6), |
125 | (server 3, share 3), (server 4, share 4), |
126 | (server 5, share 2)} |
127 | |
128 | Since G is bipartite, and since |L| = 5, we cannot have an M' such |
129 | that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and |
130 | as long as k <= 5, we can see that the layout above has |
131 | servers_of_happiness = 5, which matches the results here. |
132 | """ |
133 | if sharemap == {}: |
134 | return 0 |
135 | servermap = shares_by_server(sharemap) |
136 | graph = _flow_network_for(servermap) |
137 | |
138 | # XXX this core stuff is identical to |
139 | # happiness_upload._compute_maximum_graph and we should find a way |
140 | # to share the code. |
141 | |
142 | # This is an implementation of the Ford-Fulkerson method for finding |
143 | # a maximum flow in a flow network applied to a bipartite graph. |
144 | # Specifically, it is the Edmonds-Karp algorithm, since it uses a |
145 | # BFS to find the shortest augmenting path at each iteration, if one |
146 | # exists. |
147 | # |
148 | # The implementation here is an adapation of an algorithm described in |
149 | # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. |
150 | dim = len(graph) |
151 | flow_function = [[0 for sh in range(dim)] for s in range(dim)] |
152 | residual_graph, residual_function = residual_network(graph, flow_function) |
153 | while augmenting_path_for(residual_graph): |
154 | path = augmenting_path_for(residual_graph) |
155 | # Delta is the largest amount that we can increase flow across |
156 | # all of the edges in path. Because of the way that the residual |
157 | # function is constructed, f[u][v] for a particular edge (u, v) |
158 | # is the amount of unused capacity on that edge. Taking the |
159 | # minimum of a list of those values for each edge in the |
160 | # augmenting path gives us our delta. |
161 | delta = min(residual_function[u][v] for (u, v) in path) |
162 | for (u, v) in path: |
163 | flow_function[u][v] += delta |
164 | flow_function[v][u] -= delta |
165 | residual_graph, residual_function = residual_network(graph, |
166 | flow_function) |
167 | num_servers = len(servermap) |
168 | # The value of a flow is the total flow out of the source vertex |
169 | # (vertex 0, in our graph). We could just as well sum across all of |
170 | # f[0], but we know that vertex 0 only has edges to the servers in |
171 | # our graph, so we can stop after summing flow across those. The |
172 | # value of a flow computed in this way is the size of a maximum |
173 | # matching on the bipartite graph described above. |
174 | return sum([flow_function[0][v] for v in range(1, num_servers+1)]) |
175 | |
176 | def _flow_network_for(servermap): |
177 | """ |
178 | I take my argument, a dict of peerid -> set(shareid) mappings, and |
179 | turn it into a flow network suitable for use with Edmonds-Karp. I |
180 | then return the adjacency list representation of that network. |
181 | |
182 | Specifically, I build G = (V, E), where: |
183 | V = { peerid in servermap } U { shareid in servermap } U {s, t} |
184 | E = {(s, peerid) for each peerid} |
185 | U {(peerid, shareid) if peerid is to store shareid } |
186 | U {(shareid, t) for each shareid} |
187 | |
188 | s and t will be source and sink nodes when my caller starts treating |
189 | the graph I return like a flow network. Without s and t, the |
190 | returned graph is bipartite. |
191 | """ |
192 | # Servers don't have integral identifiers, and we can't make any |
193 | # assumptions about the way shares are indexed -- it's possible that |
194 | # there are missing shares, for example. So before making a graph, |
195 | # we re-index so that all of our vertices have integral indices, and |
196 | # that there aren't any holes. We start indexing at 1, so that we |
197 | # can add a source node at index 0. |
198 | servermap, num_shares = _reindex(servermap, base_index=1) |
199 | num_servers = len(servermap) |
200 | graph = [] # index -> [index], an adjacency list |
201 | # Add an entry at the top (index 0) that has an edge to every server |
202 | # in servermap |
203 | graph.append(list(servermap.keys())) |
204 | # For each server, add an entry that has an edge to every share that it |
205 | # contains (or will contain). |
206 | for k in servermap: |
207 | graph.append(servermap[k]) |
208 | # For each share, add an entry that has an edge to the sink. |
209 | sink_num = num_servers + num_shares + 1 |
210 | for i in range(num_shares): |
211 | graph.append([sink_num]) |
212 | # Add an empty entry for the sink, which has no outbound edges. |
213 | graph.append([]) |
214 | return graph |
215 | |
216 | |
217 | # XXX warning: this is different from happiness_upload's _reindex! |
218 | def _reindex(servermap, base_index): |
219 | """ |
220 | Given servermap, I map peerids and shareids to integers that don't |
221 | conflict with each other, so they're useful as indices in a graph. I |
222 | return a servermap that is reindexed appropriately, and also the |
223 | number of distinct shares in the resulting servermap as a convenience |
224 | for my caller. base_index tells me where to start indexing. |
225 | """ |
226 | shares = {} # shareid -> vertex index |
227 | num = base_index |
228 | ret = {} # peerid -> [shareid], a reindexed servermap. |
229 | # Number the servers first |
230 | for k in servermap: |
231 | ret[num] = servermap[k] |
232 | num += 1 |
233 | # Number the shares |
234 | for k in ret: |
235 | for shnum in ret[k]: |
236 | if shnum not in shares: |
237 | shares[shnum] = num |
238 | num += 1 |
239 | ret[k] = [shares[x] for x in ret[k]] |
240 | return (ret, len(shares)) |