source: trunk/src/allmydata/immutable/happiness_upload.py @ 5042586

Last change on this file since 5042586 was 5042586, checked in by Itamar Turner-Trauring <itamar@…>, at 2020-08-11T18:54:12Z

Docstring.

  • Property mode set to 100644
File size: 15.6 KB
Line 
1"""
2Algorithms for figuring out happiness, the number of unique nodes the data is
3on.
4
5Ported to Python 3.
6"""
7from __future__ import absolute_import
8from __future__ import division
9from __future__ import print_function
10from __future__ import unicode_literals
11
12from future.utils import PY2
13if PY2:
14    # We omit dict, just in case newdict breaks things for external Python 2 code.
15    from builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, list, object, range, str, max, min  # noqa: F401
16
17from queue import PriorityQueue
18
19
20def augmenting_path_for(graph):
21    """
22    I return an augmenting path, if there is one, from the source node
23    to the sink node in the flow network represented by my graph argument.
24    If there is no augmenting path, I return False. I assume that the
25    source node is at index 0 of graph, and the sink node is at the last
26    index. I also assume that graph is a flow network in adjacency list
27    form.
28    """
29    bfs_tree = bfs(graph, 0)
30    if bfs_tree[len(graph) - 1]:
31        n = len(graph) - 1
32        path = [] # [(u, v)], where u and v are vertices in the graph
33        while n != 0:
34            path.insert(0, (bfs_tree[n], n))
35            n = bfs_tree[n]
36        return path
37    return False
38
39def bfs(graph, s):
40    """
41    Perform a BFS on graph starting at s, where graph is a graph in
42    adjacency list form, and s is a node in graph. I return the
43    predecessor table that the BFS generates.
44    """
45    # This is an adaptation of the BFS described in "Introduction to
46    # Algorithms", Cormen et al, 2nd ed., p. 532.
47    # WHITE vertices are those that we haven't seen or explored yet.
48    WHITE = 0
49    # GRAY vertices are those we have seen, but haven't explored yet
50    GRAY  = 1
51    # BLACK vertices are those we have seen and explored
52    BLACK = 2
53    color        = [WHITE for i in range(len(graph))]
54    predecessor  = [None for i in range(len(graph))]
55    distance     = [-1 for i in range(len(graph))]
56    queue = [s] # vertices that we haven't explored yet.
57    color[s] = GRAY
58    distance[s] = 0
59    while queue:
60        n = queue.pop(0)
61        for v in graph[n]:
62            if color[v] == WHITE:
63                color[v] = GRAY
64                distance[v] = distance[n] + 1
65                predecessor[v] = n
66                queue.append(v)
67        color[n] = BLACK
68    return predecessor
69
70def residual_network(graph, f):
71    """
72    I return the residual network and residual capacity function of the
73    flow network represented by my graph and f arguments. graph is a
74    flow network in adjacency-list form, and f is a flow in graph.
75    """
76    new_graph = [[] for i in range(len(graph))]
77    cf = [[0 for s in range(len(graph))] for sh in range(len(graph))]
78    for i in range(len(graph)):
79        for v in graph[i]:
80            if f[i][v] == 1:
81                # We add an edge (v, i) with cf[v,i] = 1. This means
82                # that we can remove 1 unit of flow from the edge (i, v)
83                new_graph[v].append(i)
84                cf[v][i] = 1
85                cf[i][v] = -1
86            else:
87                # We add the edge (i, v), since we're not using it right
88                # now.
89                new_graph[i].append(v)
90                cf[i][v] = 1
91                cf[v][i] = -1
92    return (new_graph, cf)
93
94
95def calculate_happiness(mappings):
96    """
97    :param mappings: a dict mapping 'share' -> 'peer'
98
99    :returns: the happiness, which is the number of unique peers we've
100        placed shares on.
101    """
102    unique_peers = set(mappings.values())
103    assert None not in unique_peers
104    return len(unique_peers)
105
106
107def _calculate_mappings(peers, shares, servermap=None):
108    """
109    Given a set of peers, a set of shares, and a dictionary of server ->
110    set(shares), determine how the uploader should allocate shares. If a
111    servermap is supplied, determine which existing allocations should be
112    preserved. If servermap is None, calculate the maximum matching of the
113    bipartite graph (U, V, E) such that:
114
115    U = peers
116    V = shares
117    E = peers x shares
118
119    Returns a dictionary {share -> set(peer)}, indicating that the share
120    should be placed on each peer in the set. If a share's corresponding
121    value is None, the share can be placed on any server. Note that the set
122    of peers should only be one peer when returned, but it is possible to
123    duplicate shares by adding additional servers to the set.
124    """
125    peer_to_index, index_to_peer = _reindex(peers, 1)
126    share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
127    shareIndices = [share_to_index[s] for s in shares]
128    if servermap:
129        graph = _servermap_flow_graph(peers, shares, servermap)
130    else:
131        peerIndices = [peer_to_index[peer] for peer in peers]
132        graph = _flow_network(peerIndices, shareIndices)
133    max_graph = _compute_maximum_graph(graph, shareIndices)
134    return _convert_mappings(index_to_peer, index_to_share, max_graph)
135
136
137def _compute_maximum_graph(graph, shareIndices):
138    """
139    This is an implementation of the Ford-Fulkerson method for finding
140    a maximum flow in a flow network applied to a bipartite graph.
141    Specifically, it is the Edmonds-Karp algorithm, since it uses a
142    BFS to find the shortest augmenting path at each iteration, if one
143    exists.
144
145    The implementation here is an adapation of an algorithm described in
146    "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
147    """
148
149    if graph == []:
150        return {}
151
152    dim = len(graph)
153    flow_function = [[0 for sh in range(dim)] for s in range(dim)]
154    residual_graph, residual_function = residual_network(graph, flow_function)
155
156    while augmenting_path_for(residual_graph):
157        path = augmenting_path_for(residual_graph)
158        # Delta is the largest amount that we can increase flow across
159        # all of the edges in path. Because of the way that the residual
160        # function is constructed, f[u][v] for a particular edge (u, v)
161        # is the amount of unused capacity on that edge. Taking the
162        # minimum of a list of those values for each edge in the
163        # augmenting path gives us our delta.
164        delta = min(residual_function[u][v] for (u, v) in path)
165        for (u, v) in path:
166            flow_function[u][v] += delta
167            flow_function[v][u] -= delta
168            residual_graph, residual_function = residual_network(graph,flow_function)
169
170    new_mappings = {}
171    for shareIndex in shareIndices:
172        peer = residual_graph[shareIndex]
173        if peer == [dim - 1]:
174            new_mappings.setdefault(shareIndex, None)
175        else:
176            new_mappings.setdefault(shareIndex, peer[0])
177
178    return new_mappings
179
180
181def _extract_ids(mappings):
182    shares = set()
183    peers = set()
184    for share in mappings:
185        if mappings[share] == None:
186            pass
187        else:
188            shares.add(share)
189            for item in mappings[share]:
190                peers.add(item)
191    return (peers, shares)
192
193def _distribute_homeless_shares(mappings, homeless_shares, peers_to_shares):
194    """
195    Shares which are not mapped to a peer in the maximum spanning graph
196    still need to be placed on a server. This function attempts to
197    distribute those homeless shares as evenly as possible over the
198    available peers. If possible a share will be placed on the server it was
199    originally on, signifying the lease should be renewed instead.
200    """
201    #print "mappings, homeless_shares, peers_to_shares %s %s %s" % (mappings, homeless_shares, peers_to_shares)
202    servermap_peerids = set([key for key in peers_to_shares])
203    servermap_shareids = set()
204    for key in sorted(peers_to_shares.keys()):
205        # XXX maybe sort?
206        for share in peers_to_shares[key]:
207            servermap_shareids.add(share)
208
209    # First check to see if the leases can be renewed.
210    to_distribute = set()
211    for share in homeless_shares:
212        if share in servermap_shareids:
213            for peerid in peers_to_shares:
214                if share in peers_to_shares[peerid]:
215                    mappings[share] = set([peerid])
216                    break
217        else:
218            to_distribute.add(share)
219    # This builds a priority queue of peers with the number of shares
220    # each peer holds as the priority.
221    priority = {}
222    pQueue = PriorityQueue()
223    for peerid in servermap_peerids:
224        priority.setdefault(peerid, 0)
225    for share in mappings:
226        if mappings[share] is not None:
227            for peer in mappings[share]:
228                if peer in servermap_peerids:
229                    priority[peer] += 1
230    if priority == {}:
231        return
232    for peerid in priority:
233        pQueue.put((priority[peerid], peerid))
234    # Distribute the shares to peers with the lowest priority.
235    for share in to_distribute:
236        peer = pQueue.get()
237        mappings[share] = set([peer[1]])
238        pQueue.put((peer[0]+1, peer[1]))
239
240def _convert_mappings(index_to_peer, index_to_share, maximum_graph):
241    """
242    Now that a maximum spanning graph has been found, convert the indexes
243    back to their original ids so that the client can pass them to the
244    uploader.
245    """
246
247    converted_mappings = {}
248    for share in maximum_graph:
249        peer = maximum_graph[share]
250        if peer == None:
251            converted_mappings.setdefault(index_to_share[share], None)
252        else:
253            converted_mappings.setdefault(index_to_share[share], set([index_to_peer[peer]]))
254    return converted_mappings
255
256
257def _servermap_flow_graph(peers, shares, servermap):
258    """
259    Generates a flow network of peerIndices to shareIndices from a server map
260    of 'peer' -> ['shares']. According to Wikipedia, "a flow network is a
261    directed graph where each edge has a capacity and each edge receives a flow.
262    The amount of flow on an edge cannot exceed the capacity of the edge." This
263    is necessary because in order to find the maximum spanning, the Edmonds-Karp algorithm
264    converts the problem into a maximum flow problem.
265    """
266    if servermap == {}:
267        return []
268
269    peer_to_index, index_to_peer = _reindex(peers, 1)
270    share_to_index, index_to_share = _reindex(shares, len(peers) + 1)
271    graph = []
272    indexedShares = []
273    sink_num = len(peers) + len(shares) + 1
274    graph.append([peer_to_index[peer] for peer in peers])
275    #print "share_to_index %s" % share_to_index
276    #print "servermap %s" % servermap
277    for peer in peers:
278        if peer in servermap:
279            for s in servermap[peer]:
280                if s in share_to_index:
281                    indexedShares.append(share_to_index[s])
282        graph.insert(peer_to_index[peer], indexedShares)
283    for share in shares:
284        graph.insert(share_to_index[share], [sink_num])
285    graph.append([])
286    return graph
287
288
289def _reindex(items, base):
290    """
291    I take an iteratble of items and give each item an index to be used in
292    the construction of a flow network. Indices for these items start at base
293    and continue to base + len(items) - 1.
294
295    I return two dictionaries: ({item: index}, {index: item})
296    """
297    item_to_index = {}
298    index_to_item = {}
299    for item in items:
300        item_to_index.setdefault(item, base)
301        index_to_item.setdefault(base, item)
302        base += 1
303    return (item_to_index, index_to_item)
304
305
306def _flow_network(peerIndices, shareIndices):
307    """
308    Given set of peerIndices and a set of shareIndices, I create a flow network
309    to be used by _compute_maximum_graph. The return value is a two
310    dimensional list in the form of a flow network, where each index represents
311    a node, and the corresponding list represents all of the nodes it is connected
312    to.
313
314    This function is similar to allmydata.util.happinessutil.flow_network_for, but
315    we connect every peer with all shares instead of reflecting a supplied servermap.
316    """
317    graph = []
318    # The first entry in our flow network is the source.
319    # Connect the source to every server.
320    graph.append(peerIndices)
321    sink_num = len(peerIndices + shareIndices) + 1
322    # Connect every server with every share it can possibly store.
323    for peerIndex in peerIndices:
324        graph.insert(peerIndex, shareIndices)
325    # Connect every share with the sink.
326    for shareIndex in shareIndices:
327        graph.insert(shareIndex, [sink_num])
328    # Add an empty entry for the sink.
329    graph.append([])
330    return graph
331
332def share_placement(peers, readonly_peers, shares, peers_to_shares):
333    """
334    Generates the allocations the upload should based on the given
335    information. We construct a dictionary of 'share_num' ->
336    'server_id' and return it to the caller. Existing allocations
337    appear as placements because attempting to place an existing
338    allocation will renew the share.
339
340    For more information on the algorithm this class implements, refer to
341    docs/specifications/servers-of-happiness.rst
342    """
343    if not peers:
344        return dict()
345
346    # First calculate share placement for the readonly servers.
347    readonly_shares = set()
348    readonly_map = {}
349    for peer in sorted(peers_to_shares.keys()):
350        if peer in readonly_peers:
351            readonly_map.setdefault(peer, peers_to_shares[peer])
352            for share in peers_to_shares[peer]:
353                readonly_shares.add(share)
354
355    readonly_mappings = _calculate_mappings(readonly_peers, readonly_shares, readonly_map)
356    used_peers, used_shares = _extract_ids(readonly_mappings)
357
358    # Calculate share placement for the remaining existing allocations
359    new_peers = set(peers) - used_peers
360    # Squash a list of sets into one set
361    new_shares = shares - used_shares
362
363    servermap = peers_to_shares.copy()
364    for peer in sorted(peers_to_shares.keys()):
365        if peer in used_peers:
366            servermap.pop(peer, None)
367        else:
368            servermap[peer] = set(servermap[peer]) - used_shares
369            if servermap[peer] == set():
370                servermap.pop(peer, None)
371                # allmydata.test.test_upload.EncodingParameters.test_exception_messages_during_server_selection
372                # allmydata.test.test_upload.EncodingParameters.test_problem_layout_comment_52
373                # both ^^ trigger a "keyerror" here .. just ignoring is right? (fixes the tests, but ...)
374                try:
375                    new_peers.remove(peer)
376                except KeyError:
377                    pass
378
379    existing_mappings = _calculate_mappings(new_peers, new_shares, servermap)
380    existing_peers, existing_shares = _extract_ids(existing_mappings)
381
382    # Calculate share placement for the remaining peers and shares which
383    # won't be preserved by existing allocations.
384    new_peers = new_peers - existing_peers - used_peers
385
386
387    new_shares = new_shares - existing_shares - used_shares
388    new_mappings = _calculate_mappings(new_peers, new_shares)
389    #print "new_peers %s" % new_peers
390    #print "new_mappings %s" % new_mappings
391    mappings = dict(list(readonly_mappings.items()) + list(existing_mappings.items()) + list(new_mappings.items()))
392    homeless_shares = set()
393    for share in mappings:
394        if mappings[share] is None:
395            homeless_shares.add(share)
396    if len(homeless_shares) != 0:
397        # 'servermap' should contain only read/write peers
398        _distribute_homeless_shares(
399            mappings, homeless_shares,
400            {
401                k: v
402                for k, v in list(peers_to_shares.items())
403                if k not in readonly_peers
404            }
405        )
406
407    # now, if any share is *still* mapped to None that means "don't
408    # care which server it goes on", so we place it on a round-robin
409    # of read-write servers
410
411    def round_robin(peers):
412        while True:
413            for peer in peers:
414                yield peer
415    peer_iter = round_robin(peers - readonly_peers)
416
417    return {
418        k: v.pop() if v else next(peer_iter)
419        for k, v in list(mappings.items())
420    }
Note: See TracBrowser for help on using the repository browser.