Changeset ef17ef2c in trunk
- Timestamp:
- 2017-06-05T22:26:46Z (8 years ago)
- Branches:
- master
- Children:
- e68b331
- Parents:
- 42011e7
- git-author:
- meejah <meejah@…> (2017-01-20 21:58:49)
- git-committer:
- meejah <meejah@…> (2017-06-05 22:26:46)
- Location:
- src/allmydata
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/immutable/happiness_upload.py ¶
r42011e7 ref17ef2c 74 74 return (new_graph, cf) 75 75 76 def _query_all_shares(servermap, readonly_peers):77 readonly_shares = set()78 readonly_map = {}79 for peer in servermap:80 if peer in readonly_peers:81 readonly_map.setdefault(peer, servermap[peer])82 for share in servermap[peer]:83 readonly_shares.add(share)84 return readonly_shares85 86 87 76 def _convert_mappings(index_to_peer, index_to_share, maximum_graph): 88 77 """ … … 277 266 result[k] = existing.union(v) 278 267 268 279 269 def calculate_happiness(mappings): 280 270 """ 281 271 I calculate the happiness of the generated mappings 282 272 """ 283 happiness = 0 284 for share in mappings: 285 if mappings[share] is not None: 286 happiness += 1 287 return happiness 273 unique_peers = {list(v)[0] for k, v in mappings.items()} 274 return len(unique_peers) 275 288 276 289 277 def share_placement(peers, readonly_peers, shares, peers_to_shares={}): … … 291 279 :param servers: ordered list of servers, "Maybe *2N* of them." 292 280 """ 293 # "1. Query all servers for existing shares."294 #shares = _query_all_shares(servers, peers)295 #print("shares", shares)296 297 281 # "2. Construct a bipartite graph G1 of *readonly* servers to pre-existing 298 282 # shares, where an edge exists between an arbitrary readonly server S and an -
TabularUnified src/allmydata/immutable/upload.py ¶
r42011e7 ref17ef2c 202 202 return "%s: %s" % (shnum, bucketwriter.get_servername(),) 203 203 204 204 205 class PeerSelector(): 205 206 implements(IPeerSelector) … … 259 260 def is_healthy(self): 260 261 return self.min_happiness <= self.happiness 262 261 263 262 264 class Tahoe2ServerSelector(log.PrefixingLogMixin): … … 555 557 merged = merge_servers(self.peer_selector.get_sharemap_of_preexisting_shares(), self.use_trackers) 556 558 effective_happiness = servers_of_happiness(self.peer_selector.get_allocations()) 557 #effective_happiness = self.peer_selector.happiness558 print "effective_happiness %s" % effective_happiness559 559 if effective_happiness < self.servers_of_happiness: 560 msg = failure_message(len(self.serverids_with_shares), 561 self.needed_shares, 562 self.servers_of_happiness, 563 effective_happiness) 560 msg = failure_message( 561 peer_count=len(self.serverids_with_shares), 562 k=self.needed_shares, 563 happy=self.servers_of_happiness, 564 effective_happy=effective_happiness, 565 ) 564 566 msg = ("server selection failed for %s: %s (%s), merged=%s" % 565 567 (self, msg, self._get_progress_message(), -
TabularUnified src/allmydata/test/test_happiness.py ¶
r42011e7 ref17ef2c 97 97 } 98 98 self.assertEqual(expected, places0) 99 100 def test_unhappy(self): 101 102 shares = { 103 'share1', 'share2', 'share3', 'share4', 'share5', 104 } 105 peers = { 106 'peer1', 'peer2', 'peer3', 'peer4', 107 } 108 readonly_peers = set() 109 peers_to_shares = { 110 } 111 112 places = happiness_upload.share_placement(peers, readonly_peers, shares, peers_to_shares) 113 happiness = happiness_upload.calculate_happiness(places) 114 self.assertEqual(4, happiness) 115 116 def test_calc_happy(self): 117 sharemap = { 118 0: set(["\x0e\xd6\xb3>\xd6\x85\x9d\x94')'\xf03:R\x88\xf1\x04\x1b\xa4", 119 '\x8de\x1cqM\xba\xc3\x0b\x80\x9aC<5\xfc$\xdc\xd5\xd3\x8b&', 120 '\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t', 121 '\xc4\x83\x9eJ\x7f\xac| .\xc90\xf4b\xe4\x92\xbe\xaa\xe6\t\x80']), 122 1: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 123 2: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 124 3: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 125 4: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 126 5: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 127 6: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 128 7: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 129 8: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 130 9: set(['\xb9\xa3N\x80u\x9c_\xf7\x97FSS\xa7\xbd\x02\xf9f$:\t']), 131 } 132 happy = happiness_upload.calculate_happiness(sharemap) 133 self.assertEqual(2, happy) -
TabularUnified src/allmydata/test/test_upload.py ¶
r42011e7 ref17ef2c 941 941 self.set_up_grid(num_servers=4) 942 942 c = self.g.clients[0] 943 DATA = upload.Data(100 * "kittens", convergence="")943 DATA = upload.Data(100 * "kittens", convergence="") 944 944 # These parameters are unsatisfiable with only 4 servers, but should 945 945 # work with 5, as long as the original 4 are not stuck in the open -
TabularUnified src/allmydata/util/happinessutil.py ¶
r42011e7 ref17ef2c 5 5 6 6 from copy import deepcopy 7 from allmydata.immutable.happiness_upload import share_placement, calculate_happiness 7 from allmydata.immutable.happiness_upload import share_placement 8 from allmydata.immutable.happiness_upload import calculate_happiness 9 from allmydata.immutable.happiness_upload import residual_network 10 from allmydata.immutable.happiness_upload import bfs 11 from allmydata.immutable.happiness_upload import augmenting_path_for 12 8 13 9 14 def failure_message(peer_count, k, happy, effective_happy): … … 79 84 return servermap 80 85 86 81 87 def servers_of_happiness(sharemap): 82 peers = sharemap.values() 83 if len(peers) == 1: 84 peers = peers[0] 85 else: 86 peers = [list(x)[0] for x in peers] # XXX 87 shares = sharemap.keys() 88 readonly_peers = set() # XXX 89 peers_to_shares = shares_by_server(sharemap) 90 places0 = share_placement(peers, readonly_peers, shares, peers_to_shares) 91 return calculate_happiness(places0) 88 """ 89 I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I 90 return the 'servers_of_happiness' number that sharemap results in. 91 92 To calculate the 'servers_of_happiness' number for the sharemap, I 93 construct a bipartite graph with servers in one partition of vertices 94 and shares in the other, and with an edge between a server s and a share t 95 if s is to store t. I then compute the size of a maximum matching in 96 the resulting graph; this is then returned as the 'servers_of_happiness' 97 for my arguments. 98 99 For example, consider the following layout: 100 101 server 1: shares 1, 2, 3, 4 102 server 2: share 6 103 server 3: share 3 104 server 4: share 4 105 server 5: share 2 106 107 From this, we can construct the following graph: 108 109 L = {server 1, server 2, server 3, server 4, server 5} 110 R = {share 1, share 2, share 3, share 4, share 6} 111 V = L U R 112 E = {(server 1, share 1), (server 1, share 2), (server 1, share 3), 113 (server 1, share 4), (server 2, share 6), (server 3, share 3), 114 (server 4, share 4), (server 5, share 2)} 115 G = (V, E) 116 117 Note that G is bipartite since every edge in e has one endpoint in L 118 and one endpoint in R. 119 120 A matching in a graph G is a subset M of E such that, for any vertex 121 v in V, v is incident to at most one edge of M. A maximum matching 122 in G is a matching that is no smaller than any other matching. For 123 this graph, a matching of cardinality 5 is: 124 125 M = {(server 1, share 1), (server 2, share 6), 126 (server 3, share 3), (server 4, share 4), 127 (server 5, share 2)} 128 129 Since G is bipartite, and since |L| = 5, we cannot have an M' such 130 that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and 131 as long as k <= 5, we can see that the layout above has 132 servers_of_happiness = 5, which matches the results here. 133 """ 134 if sharemap == {}: 135 return 0 136 servermap = shares_by_server(sharemap) 137 graph = _flow_network_for(servermap) 138 139 # XXX this core stuff is identical to 140 # happiness_upload._compute_maximum_graph and we should find a way 141 # to share the code. 142 143 # This is an implementation of the Ford-Fulkerson method for finding 144 # a maximum flow in a flow network applied to a bipartite graph. 145 # Specifically, it is the Edmonds-Karp algorithm, since it uses a 146 # BFS to find the shortest augmenting path at each iteration, if one 147 # exists. 148 # 149 # The implementation here is an adapation of an algorithm described in 150 # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662. 151 dim = len(graph) 152 flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)] 153 residual_graph, residual_function = residual_network(graph, flow_function) 154 while augmenting_path_for(residual_graph): 155 path = augmenting_path_for(residual_graph) 156 # Delta is the largest amount that we can increase flow across 157 # all of the edges in path. Because of the way that the residual 158 # function is constructed, f[u][v] for a particular edge (u, v) 159 # is the amount of unused capacity on that edge. Taking the 160 # minimum of a list of those values for each edge in the 161 # augmenting path gives us our delta. 162 delta = min(map(lambda (u, v), rf=residual_function: rf[u][v], 163 path)) 164 for (u, v) in path: 165 flow_function[u][v] += delta 166 flow_function[v][u] -= delta 167 residual_graph, residual_function = residual_network(graph, 168 flow_function) 169 num_servers = len(servermap) 170 # The value of a flow is the total flow out of the source vertex 171 # (vertex 0, in our graph). We could just as well sum across all of 172 # f[0], but we know that vertex 0 only has edges to the servers in 173 # our graph, so we can stop after summing flow across those. The 174 # value of a flow computed in this way is the size of a maximum 175 # matching on the bipartite graph described above. 176 return sum([flow_function[0][v] for v in xrange(1, num_servers+1)]) 177 178 def _flow_network_for(servermap): 179 """ 180 I take my argument, a dict of peerid -> set(shareid) mappings, and 181 turn it into a flow network suitable for use with Edmonds-Karp. I 182 then return the adjacency list representation of that network. 183 184 Specifically, I build G = (V, E), where: 185 V = { peerid in servermap } U { shareid in servermap } U {s, t} 186 E = {(s, peerid) for each peerid} 187 U {(peerid, shareid) if peerid is to store shareid } 188 U {(shareid, t) for each shareid} 189 190 s and t will be source and sink nodes when my caller starts treating 191 the graph I return like a flow network. Without s and t, the 192 returned graph is bipartite. 193 """ 194 # Servers don't have integral identifiers, and we can't make any 195 # assumptions about the way shares are indexed -- it's possible that 196 # there are missing shares, for example. So before making a graph, 197 # we re-index so that all of our vertices have integral indices, and 198 # that there aren't any holes. We start indexing at 1, so that we 199 # can add a source node at index 0. 200 servermap, num_shares = _reindex(servermap, base_index=1) 201 num_servers = len(servermap) 202 graph = [] # index -> [index], an adjacency list 203 # Add an entry at the top (index 0) that has an edge to every server 204 # in servermap 205 graph.append(servermap.keys()) 206 # For each server, add an entry that has an edge to every share that it 207 # contains (or will contain). 208 for k in servermap: 209 graph.append(servermap[k]) 210 # For each share, add an entry that has an edge to the sink. 211 sink_num = num_servers + num_shares + 1 212 for i in xrange(num_shares): 213 graph.append([sink_num]) 214 # Add an empty entry for the sink, which has no outbound edges. 215 graph.append([]) 216 return graph 217 218 219 # XXX warning: this is different from happiness_upload's _reindex! 220 def _reindex(servermap, base_index): 221 """ 222 Given servermap, I map peerids and shareids to integers that don't 223 conflict with each other, so they're useful as indices in a graph. I 224 return a servermap that is reindexed appropriately, and also the 225 number of distinct shares in the resulting servermap as a convenience 226 for my caller. base_index tells me where to start indexing. 227 """ 228 shares = {} # shareid -> vertex index 229 num = base_index 230 ret = {} # peerid -> [shareid], a reindexed servermap. 231 # Number the servers first 232 for k in servermap: 233 ret[num] = servermap[k] 234 num += 1 235 # Number the shares 236 for k in ret: 237 for shnum in ret[k]: 238 if not shares.has_key(shnum): 239 shares[shnum] = num 240 num += 1 241 ret[k] = map(lambda x: shares[x], ret[k]) 242 return (ret, len(shares))
Note: See TracChangeset
for help on using the changeset viewer.