source: trunk/src/allmydata/immutable/happiness_upload.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 15.2 KB
Line 
1"""
2Algorithms for figuring out happiness, the number of unique nodes the data is
3on.
4
5Ported to Python 3.
6"""
7
8from queue import PriorityQueue
9
10
11def augmenting_path_for(graph):
12    """
13    I return an augmenting path, if there is one, from the source node
14    to the sink node in the flow network represented by my graph argument.
15    If there is no augmenting path, I return False. I assume that the
16    source node is at index 0 of graph, and the sink node is at the last
17    index. I also assume that graph is a flow network in adjacency list
18    form.
19    """
20    bfs_tree = bfs(graph, 0)
21    if bfs_tree[len(graph) - 1]:
22        n = len(graph) - 1
23        path = [] # [(u, v)], where u and v are vertices in the graph
24        while n != 0:
25            path.insert(0, (bfs_tree[n], n))
26            n = bfs_tree[n]
27        return path
28    return False
29
30def bfs(graph, s):
31    """
32    Perform a BFS on graph starting at s, where graph is a graph in
33    adjacency list form, and s is a node in graph. I return the
34    predecessor table that the BFS generates.
35    """
36    # This is an adaptation of the BFS described in "Introduction to
37    # Algorithms", Cormen et al, 2nd ed., p. 532.
38    # WHITE vertices are those that we haven't seen or explored yet.
39    WHITE = 0
40    # GRAY vertices are those we have seen, but haven't explored yet
41    GRAY  = 1
42    # BLACK vertices are those we have seen and explored
43    BLACK = 2
44    color        = [WHITE for i in range(len(graph))]
45    predecessor  = [None for i in range(len(graph))]
46    distance     = [-1 for i in range(len(graph))]
47    queue = [s] # vertices that we haven't explored yet.
48    color[s] = GRAY
49    distance[s] = 0
50    while queue:
51        n = queue.pop(0)
52        for v in graph[n]:
53            if color[v] == WHITE:
54                color[v] = GRAY
55                distance[v] = distance[n] + 1
56                predecessor[v] = n
57                queue.append(v)
58        color[n] = BLACK
59    return predecessor
60
61def residual_network(graph, f):
62    """
63    I return the residual network and residual capacity function of the
64    flow network represented by my graph and f arguments. graph is a
65    flow network in adjacency-list form, and f is a flow in graph.
66    """
67    new_graph = [[] for i in range(len(graph))]
68    cf = [[0 for s in range(len(graph))] for sh in range(len(graph))]
69    for i in range(len(graph)):
70        for v in graph[i]:
71            if f[i][v] == 1:
72                # We add an edge (v, i) with cf[v,i] = 1. This means
73                # that we can remove 1 unit of flow from the edge (i, v)
74                new_graph[v].append(i)
75                cf[v][i] = 1
76                cf[i][v] = -1
77            else:
78                # We add the edge (i, v), since we're not using it right
79                # now.
80                new_graph[i].append(v)
81                cf[i][v] = 1
82                cf[v][i] = -1
83    return (new_graph, cf)
84
85
86def calculate_happiness(mappings):
87    """
88    :param mappings: a dict mapping 'share' -> 'peer'
89
90    :returns: the happiness, which is the number of unique peers we've
91        placed shares on.
92    """
93    unique_peers = set(mappings.values())
94    assert None not in unique_peers
95    return len(unique_peers)
96
97
98def _calculate_mappings(peers, shares, servermap=None):
99    """
100    Given a set of peers, a set of shares, and a dictionary of server ->
101    set(shares), determine how the uploader should allocate shares. If a
102    servermap is supplied, determine which existing allocations should be
103    preserved. If servermap is None, calculate the maximum matching of the
104    bipartite graph (U, V, E) such that:
105
106    U = peers
107    V = shares
108    E = peers x shares
109
110    Returns a dictionary {share -> set(peer)}, indicating that the share
111    should be placed on each peer in the set. If a share's corresponding
112    value is None, the share can be placed on any server. Note that the set
113    of peers should only be one peer when returned, but it is possible to
114    duplicate shares by adding additional servers to the set.
115    """
116    peer_to_index, index_to_peer = _reindex(peers, 1)
117    share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
118    shareIndices = [share_to_index[s] for s in shares]
119    if servermap:
120        graph = _servermap_flow_graph(peers, shares, servermap)
121    else:
122        peerIndices = [peer_to_index[peer] for peer in peers]
123        graph = _flow_network(peerIndices, shareIndices)
124    max_graph = _compute_maximum_graph(graph, shareIndices)
125    return _convert_mappings(index_to_peer, index_to_share, max_graph)
126
127
128def _compute_maximum_graph(graph, shareIndices):
129    """
130    This is an implementation of the Ford-Fulkerson method for finding
131    a maximum flow in a flow network applied to a bipartite graph.
132    Specifically, it is the Edmonds-Karp algorithm, since it uses a
133    BFS to find the shortest augmenting path at each iteration, if one
134    exists.
135
136    The implementation here is an adapation of an algorithm described in
137    "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
138    """
139
140    if graph == []:
141        return {}
142
143    dim = len(graph)
144    flow_function = [[0 for sh in range(dim)] for s in range(dim)]
145    residual_graph, residual_function = residual_network(graph, flow_function)
146
147    while augmenting_path_for(residual_graph):
148        path = augmenting_path_for(residual_graph)
149        # Delta is the largest amount that we can increase flow across
150        # all of the edges in path. Because of the way that the residual
151        # function is constructed, f[u][v] for a particular edge (u, v)
152        # is the amount of unused capacity on that edge. Taking the
153        # minimum of a list of those values for each edge in the
154        # augmenting path gives us our delta.
155        delta = min(residual_function[u][v] for (u, v) in path)
156        for (u, v) in path:
157            flow_function[u][v] += delta
158            flow_function[v][u] -= delta
159            residual_graph, residual_function = residual_network(graph,flow_function)
160
161    new_mappings = {}
162    for shareIndex in shareIndices:
163        peer = residual_graph[shareIndex]
164        if peer == [dim - 1]:
165            new_mappings.setdefault(shareIndex, None)
166        else:
167            new_mappings.setdefault(shareIndex, peer[0])
168
169    return new_mappings
170
171
172def _extract_ids(mappings):
173    shares = set()
174    peers = set()
175    for share in mappings:
176        if mappings[share] == None:
177            pass
178        else:
179            shares.add(share)
180            for item in mappings[share]:
181                peers.add(item)
182    return (peers, shares)
183
184def _distribute_homeless_shares(mappings, homeless_shares, peers_to_shares):
185    """
186    Shares which are not mapped to a peer in the maximum spanning graph
187    still need to be placed on a server. This function attempts to
188    distribute those homeless shares as evenly as possible over the
189    available peers. If possible a share will be placed on the server it was
190    originally on, signifying the lease should be renewed instead.
191    """
192    #print("mappings, homeless_shares, peers_to_shares %s %s %s" % (mappings, homeless_shares, peers_to_shares))
193    servermap_peerids = set([key for key in peers_to_shares])
194    servermap_shareids = set()
195    for key in sorted(peers_to_shares.keys()):
196        # XXX maybe sort?
197        for share in peers_to_shares[key]:
198            servermap_shareids.add(share)
199
200    # First check to see if the leases can be renewed.
201    to_distribute = set()
202    for share in homeless_shares:
203        if share in servermap_shareids:
204            for peerid in peers_to_shares:
205                if share in peers_to_shares[peerid]:
206                    mappings[share] = set([peerid])
207                    break
208        else:
209            to_distribute.add(share)
210    # This builds a priority queue of peers with the number of shares
211    # each peer holds as the priority.
212    priority = {}
213    pQueue = PriorityQueue()
214    for peerid in servermap_peerids:
215        priority.setdefault(peerid, 0)
216    for share in mappings:
217        if mappings[share] is not None:
218            for peer in mappings[share]:
219                if peer in servermap_peerids:
220                    priority[peer] += 1
221    if priority == {}:
222        return
223    for peerid in priority:
224        pQueue.put((priority[peerid], peerid))
225    # Distribute the shares to peers with the lowest priority.
226    for share in to_distribute:
227        peer = pQueue.get()
228        mappings[share] = set([peer[1]])
229        pQueue.put((peer[0]+1, peer[1]))
230
231def _convert_mappings(index_to_peer, index_to_share, maximum_graph):
232    """
233    Now that a maximum spanning graph has been found, convert the indexes
234    back to their original ids so that the client can pass them to the
235    uploader.
236    """
237
238    converted_mappings = {}
239    for share in maximum_graph:
240        peer = maximum_graph[share]
241        if peer == None:
242            converted_mappings.setdefault(index_to_share[share], None)
243        else:
244            converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]]))
245    return converted_mappings
246
247
248def _servermap_flow_graph(peers, shares, servermap):
249    """
250    Generates a flow network of peerIndices to shareIndices from a server map
251    of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a
252    directed graph where each edge has a capacity and each edge receives a flow.
253    The amount of flow on an edge cannot exceed the capacity of the edge." This
254    is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm
255    converts the problem into a maximum flow problem.
256    """
257    if servermap == {}:
258        return []
259
260    peer_to_index, index_to_peer = _reindex(peers, 1)
261    share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
262    graph = []
263    indexedShares = []
264    sink_num = len(peers) + len(shares) + 1
265    graph.append([peer_to_index[peer] for peer in peers])
266    #print("share_to_index %s" % share_to_index)
267    #print("servermap %s" % servermap)
268    for peer in peers:
269        if peer in servermap:
270            for s in servermap[peer]:
271                if s in share_to_index:
272                    indexedShares.append(share_to_index[s])
273        graph.insert(peer_to_index[peer], indexedShares)
274    for share in shares:
275        graph.insert(share_to_index[share], [sink_num])
276    graph.append([])
277    return graph
278
279
280def _reindex(items, base):
281    """
282    I take an iteratble of items and give each item an index to be used in
283    the construction of a flow network. Indices for these items start at base
284    and continue to base + len(items) - 1.
285
286    I return two dictionaries: ({item: index}, {index: item})
287    """
288    item_to_index = {}
289    index_to_item = {}
290    for item in items:
291        item_to_index.setdefault(item, base)
292        index_to_item.setdefault(base, item)
293        base += 1
294    return (item_to_index, index_to_item)
295
296
297def _flow_network(peerIndices, shareIndices):
298    """
299    Given set of peerIndices and a set of shareIndices, I create a flow network
300    to be used by _compute_maximum_graph. The return value is a two
301    dimensional list in the form of a flow network, where each index represents
302    a node, and the corresponding list represents all of the nodes it is connected
303    to.
304
305    This function is similar to allmydata.util.happinessutil.flow_network_for, but
306    we connect every peer with all shares instead of reflecting a supplied servermap.
307    """
308    graph = []
309    # The first entry in our flow network is the source.
310    # Connect the source to every server.
311    graph.append(peerIndices)
312    sink_num = len(peerIndices + shareIndices) + 1
313    # Connect every server with every share it can possibly store.
314    for peerIndex in peerIndices:
315        graph.insert(peerIndex, shareIndices)
316    # Connect every share with the sink.
317    for shareIndex in shareIndices:
318        graph.insert(shareIndex, [sink_num])
319    # Add an empty entry for the sink.
320    graph.append([])
321    return graph
322
323def share_placement(peers, readonly_peers, shares, peers_to_shares):
324    """
325    Generates the allocations the upload should based on the given
326    information. We construct a dictionary of 'share_num' ->
327    'server_id' and return it to the caller. Existing allocations
328    appear as placements because attempting to place an existing
329    allocation will renew the share.
330
331    For more information on the algorithm this class implements, refer to
332    docs/specifications/servers-of-happiness.rst
333    """
334    if not peers:
335        return dict()
336
337    # First calculate share placement for the readonly servers.
338    readonly_shares = set()
339    readonly_map = {}
340    for peer in sorted(peers_to_shares.keys()):
341        if peer in readonly_peers:
342            readonly_map.setdefault(peer, peers_to_shares[peer])
343            for share in peers_to_shares[peer]:
344                readonly_shares.add(share)
345
346    readonly_mappings = _calculate_mappings(readonly_peers, readonly_shares, readonly_map)
347    used_peers, used_shares = _extract_ids(readonly_mappings)
348
349    # Calculate share placement for the remaining existing allocations
350    new_peers = set(peers) - used_peers
351    # Squash a list of sets into one set
352    new_shares = shares - used_shares
353
354    servermap = peers_to_shares.copy()
355    for peer in sorted(peers_to_shares.keys()):
356        if peer in used_peers:
357            servermap.pop(peer, None)
358        else:
359            servermap[peer] = set(servermap[peer]) - used_shares
360            if servermap[peer] == set():
361                servermap.pop(peer, None)
362                # allmydata.test.test_upload.EncodingParameters.test_exception_messages_during_server_selection
363                # allmydata.test.test_upload.EncodingParameters.test_problem_layout_comment_52
364                # both ^^ trigger a "keyerror" here .. just ignoring is right? (fixes the tests, but ...)
365                try:
366                    new_peers.remove(peer)
367                except KeyError:
368                    pass
369
370    existing_mappings = _calculate_mappings(new_peers, new_shares, servermap)
371    existing_peers, existing_shares = _extract_ids(existing_mappings)
372
373    # Calculate share placement for the remaining peers and shares which
374    # won't be preserved by existing allocations.
375    new_peers = new_peers - existing_peers - used_peers
376
377
378    new_shares = new_shares - existing_shares - used_shares
379    new_mappings = _calculate_mappings(new_peers, new_shares)
380    #print("new_peers %s" % new_peers)
381    #print("new_mappings %s" % new_mappings)
382    mappings = dict(list(readonly_mappings.items()) + list(existing_mappings.items()) + list(new_mappings.items()))
383    homeless_shares = set()
384    for share in mappings:
385        if mappings[share] is None:
386            homeless_shares.add(share)
387    if len(homeless_shares) != 0:
388        # 'servermap' should contain only read/write peers
389        _distribute_homeless_shares(
390            mappings, homeless_shares,
391            {
392                k: v
393                for k, v in list(peers_to_shares.items())
394                if k not in readonly_peers
395            }
396        )
397
398    # now, if any share is *still* mapped to None that means "don't
399    # care which server it goes on", so we place it on a round-robin
400    # of read-write servers
401
402    def round_robin(peers):
403        while True:
404            for peer in peers:
405                yield peer
406    peer_iter = round_robin(peers - readonly_peers)
407
408    return {
409        k: v.pop() if v else next(peer_iter)
410        for k, v in list(mappings.items())
411    }
Note: See TracBrowser for help on using the repository browser.