Ticket #1382: 1382.dpatch

File 1382.dpatch, 151.5 KB (added by kevan, at 2011-05-03T04:08:16Z)

snapshot of #1382; includes new, simplified uploader, refactored server selection logic, tests

Line 
1Sun Apr 24 16:19:50 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
2  * immutable/upload.py: New uploader to work with an IPeerSelector object
3
4Sun Apr 24 16:20:19 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
5  * immutable/encode.py: Alter the encoder to work with tann IPeerSelector object
6
7Sun Apr 24 16:20:47 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
8  * interfaces.py: Add IPeerSelector interfaces
9
10Sun Apr 24 16:21:16 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
11  * test: fix existing tests to work with the new uploader + graph combination
12
13Mon May  2 20:49:26 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
14  * util/happinessutil.py: Abstract happiness functions behind an IPeerSelector-conforming object
15
16Mon May  2 20:50:09 PDT 2011  Kevan Carstensen <kevan@isnotajoke.com>
17  * test/test_upload: Add tests for HappinessGraph object.
18
19New patches:
20
21[immutable/upload.py: New uploader to work with an IPeerSelector object
22Kevan Carstensen <kevan@isnotajoke.com>**20110424231950
23 Ignore-this: fb67dcc9e75e4b87a11b77d1352c7eeb
24] {
25hunk ./src/allmydata/immutable/upload.py 16
26 from allmydata.storage.server import si_b2a
27 from allmydata.immutable import encode
28 from allmydata.util import base32, dictutil, idlib, log, mathutil
29-from allmydata.util.happinessutil import servers_of_happiness, \
30-                                         shares_by_server, merge_servers, \
31-                                         failure_message
32+from allmydata.util.happinessutil import HappinessGraph
33 from allmydata.util.assertutil import precondition
34 from allmydata.util.rrefutil import add_version_to_remote_reference
35 from allmydata.interfaces import IUploadable, IUploader, IUploadResults, \
36hunk ./src/allmydata/immutable/upload.py 74
37                  sharesize, blocksize, num_segments, num_share_hashes,
38                  storage_index,
39                  bucket_renewal_secret, bucket_cancel_secret):
40+
41         self._server = server
42         self.buckets = {} # k: shareid, v: IRemoteBucketWriter
43         self.sharesize = sharesize
44hunk ./src/allmydata/immutable/upload.py 95
45 
46     def __repr__(self):
47         return ("<ServerTracker for server %s and SI %s>"
48-                % (self._server.name(), si_b2a(self.storage_index)[:5]))
49+                % (idlib.shortnodeid_b2a(self.serverid),
50+                   si_b2a(self.storage_index)[:5]))
51 
52     def get_serverid(self):
53         return self._server.get_serverid()
54hunk ./src/allmydata/immutable/upload.py 117
55 
56     def ask_about_existing_shares(self):
57         rref = self._server.get_rref()
58-        return rref.callRemote("get_buckets", self.storage_index)
59+        return rref.callRemote("get_buckets",
60+                               self.storage_index)
61 
62     def _got_reply(self, (alreadygot, buckets)):
63         #log.msg("%s._got_reply(%s)" % (self, (alreadygot, buckets)))
64hunk ./src/allmydata/immutable/upload.py 156
65     return "%s: %s" % (shnum, idlib.shortnodeid_b2a(bucketwriter._nodeid),)
66 
67 class Tahoe2ServerSelector(log.PrefixingLogMixin):
68-
69-    def __init__(self, upload_id, logparent=None, upload_status=None):
70+    """
71+    I'm responsible for selecting peers for a particular upload or
72+    repair operation. I stick with the operation until it is done.
73+    """
74+    # So how do we want this to work? Well, we're basically looking at
75+    # two tasks: repairing and uploading, since these are the two tasks
76+    # that place shares on the grid.
77+    #
78+    # When we're uploading, we're placing shares on the grid that were
79+    # never there before. In this case, we're going to be fairly strict
80+    # about satisfying our distribution criterion.
81+    #
82+    # When we're repairing, we're replacing shares that were there
83+    # before. So we're interested in finding as many of those previously
84+    # existing shares as we can, integrating them into our matching, and
85+    # replacing shares intelligently. We may also want to proceed even
86+    # in the case that we can't distribute shares as well as we might
87+    # like, because doing so will improve the health of the file.
88+    #
89+    # Operations like repairs, check and repairs, and so on should be
90+    # able to hint to us explicitly that we'll be working in the repair
91+    # state, in which case we'll be circumspect about declaring failure
92+    # and looking for other shares. Otherwise, we'll assume that we're
93+    # in the normal upload state, in which case we'll be fairly lax
94+    # about looking for pre-existing shares, and strict about declaring
95+    # success or failure.
96+    #
97+    # The parent upload process will initialize us with as much
98+    # information as it has. We'll then generate a matching based on
99+    # that information and return that to the caller. If the client
100+    # decides to go ahead with the upload, they'll keep me around for
101+    # the upload process. When they lose peers, they'll tell us about
102+    # them and we'll tell them whether they should continue with the
103+    # upload or not. We'll also record things for the upload status at
104+    # the end of the process.
105+    def __init__(self, upload_id, logparent=None, upload_status=None,
106+                 repair=False):
107         self.upload_id = upload_id
108         self.query_count, self.good_query_count, self.bad_query_count = 0,0,0
109         # Servers that are working normally, but full.
110hunk ./src/allmydata/immutable/upload.py 202
111         self.last_failure_msg = None
112         self._status = IUploadStatus(upload_status)
113         log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id)
114+        # Don't start in the repair state by default.
115+        #
116+        # Note that things that we see during our selection process can
117+        # influence our decision about whether we're in a repair mode or
118+        # not.
119+        self.repair = repair
120+
121+        # Trigger file discovery behavior if necessary.
122+        self._first_time = True
123+        self._needs_repair_check = False
124+
125         self.log("starting", level=log.OPERATIONAL)
126 
127     def __repr__(self):
128hunk ./src/allmydata/immutable/upload.py 223
129                          num_segments, total_shares, needed_shares,
130                          servers_of_happiness):
131         """
132-        @return: (upload_trackers, already_servers), where upload_trackers is
133-                 a set of ServerTracker instances that have agreed to hold
134-                 some shares for us (the shareids are stashed inside the
135-                 ServerTracker), and already_servers is a dict mapping shnum
136-                 to a set of serverids which claim to already have the share.
137+        @return: A Deferred that fires when we're done selecting.
138+                 The caller can then ask me for my write buckets.
139         """
140hunk ./src/allmydata/immutable/upload.py 226
141+        # How does this process work?
142+        #
143+        # Well, essentially, we know that we have shares 1-N to
144+        # distribute. And we know we have some servers in the storage
145+        # broker. Given that, we want to find a nice matching. But how?
146+        # How does the current one work?
147+        #
148+        #  - Prunes the list of servers by those that can store our blocks.
149+        #  - Takes the first few of those servers.
150+        #  - Tries to store shares on those servers.
151+        #  - Checks.
152+        #
153+        # Not that great. How should it work?
154+        #   - Prunes the list of servers by those that can store our
155+        #     blocks.
156+        #   - Ask some proportion of those servers DYHB. Small
157+        #     proportion if an upload, larger and more thorough portion if
158+        #     not an upload. If upload and we find existing shares, enter
159+        #     the repair state and look more thoroughly.
160+        #   - Rework happinessutil. It should take a list of servers, a
161+        #     list of shares, and a list of edges that we found in the
162+        #     previous step, then compute the maximum matching, and give
163+        #     us a list of servers that we need to try to allocate
164+        #     buckets on.
165+        #   - Allocate buckets. If unsuccessful, goto the happinessutil
166+        #     and remove the failed server.
167+        #   - Once we're done, return the list to our caller.
168 
169         if self._status:
170             self._status.set_status("Contacting Servers..")
171hunk ./src/allmydata/immutable/upload.py 261
172         self.servers_of_happiness = servers_of_happiness
173         self.needed_shares = needed_shares
174 
175-        self.homeless_shares = set(range(total_shares))
176-        self.contacted_trackers = [] # servers worth asking again
177-        self.contacted_trackers2 = [] # servers that we have asked again
178-        self._started_second_pass = False
179+        self.contacted_trackers = set() # used to make an error message.
180         self.use_trackers = set() # ServerTrackers that have shares assigned
181                                   # to them
182         self.preexisting_shares = {} # shareid => set(serverids) holding shareid
183hunk ./src/allmydata/immutable/upload.py 265
184-
185-        # These servers have shares -- any shares -- for our SI. We keep
186-        # track of these to write an error message with them later.
187-        self.serverids_with_shares = set()
188-
189+        self.allocated_shares = {} # XXX: Dubious whether this is necessary
190         # this needed_hashes computation should mirror
191         # Encoder.send_all_share_hash_trees. We use an IncompleteHashTree
192         # (instead of a HashTree) because we don't require actual hashing
193hunk ./src/allmydata/immutable/upload.py 279
194                                              None)
195         allocated_size = wbp.get_allocated_size()
196         all_servers = storage_broker.get_servers_for_psi(storage_index)
197+
198         if not all_servers:
199             raise NoServersError("client gave us zero servers")
200 
201hunk ./src/allmydata/immutable/upload.py 291
202             v0 = server.get_rref().version
203             v1 = v0["http://allmydata.org/tahoe/protocols/storage/v1"]
204             return v1["maximum-immutable-share-size"]
205+
206+        # We'll ask these peers to store shares later.
207         writable_servers = [server for server in all_servers
208                             if _get_maxsize(server) >= allocated_size]
209hunk ./src/allmydata/immutable/upload.py 295
210-        readonly_servers = set(all_servers[:2*total_shares]) - set(writable_servers)
211 
212         # decide upon the renewal/cancel secrets, to include them in the
213         # allocate_buckets query.
214hunk ./src/allmydata/immutable/upload.py 304
215         file_renewal_secret = file_renewal_secret_hash(client_renewal_secret,
216                                                        storage_index)
217         file_cancel_secret = file_cancel_secret_hash(client_cancel_secret,
218+
219                                                      storage_index)
220hunk ./src/allmydata/immutable/upload.py 306
221-        def _make_trackers(servers):
222-            trackers = []
223-            for s in servers:
224-                seed = s.get_lease_seed()
225-                renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
226-                cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
227-                st = ServerTracker(s,
228-                                   share_size, block_size,
229-                                   num_segments, num_share_hashes,
230-                                   storage_index,
231-                                   renew, cancel)
232-                trackers.append(st)
233-            return trackers
234-        self.uncontacted_trackers = _make_trackers(writable_servers)
235+        self._trackers = {}
236+        for server in all_servers:
237+            seed = server.get_lease_seed()
238+            renew = bucket_renewal_secret_hash(file_renewal_secret, seed)
239+            cancel = bucket_cancel_secret_hash(file_cancel_secret, seed)
240+            self._trackers[server.get_serverid()] = ServerTracker(server,
241+                                           share_size, block_size,
242+                                           num_segments, num_share_hashes,
243+                                           storage_index,
244+                                           renew, cancel)
245 
246hunk ./src/allmydata/immutable/upload.py 317
247-        # We don't try to allocate shares to these servers, since they've
248-        # said that they're incapable of storing shares of the size that we'd
249-        # want to store. We ask them about existing shares for this storage
250-        # index, which we want to know about for accurate
251-        # servers_of_happiness accounting, then we forget about them.
252-        readonly_trackers = _make_trackers(readonly_servers)
253+        # We keep track of serverids separately, since the
254+        # HappinessGraph only cares about ids, not necessarily how we
255+        # choose to implement our server abstractions.
256+        self.writable_serverids = [x.get_serverid() for x in writable_servers]
257+        self.all_serverids = [x.get_serverid() for x in all_servers]
258+        # We return this to the client. It fires when we're done,
259+        # returning our graph instance.
260+        self._done = defer.Deferred()
261+
262+        self._graph = HappinessGraph(needed_shares,
263+                                     servers_of_happiness,
264+                                     total_shares)
265+
266+        self._place_shares()
267+
268+        # We'll fire self._done once we know that we're done.
269+        return self._done
270+
271+
272+    def _find_existing_shares(self, ignored=None):
273+        """
274+        I poll some of the peers that I know about for existing shares.
275+        I fire with None when I'm done doing that.
276+        """
277+        self.log("Locating existing shares")
278+
279+        if (self.repair and not self._needs_repair_check) or \
280+           (not self.repair and not self._first_time):
281+            self.log("no existing shares found on the last pass, continuing")
282+            return
283+
284+        if self.repair and self._needs_repair_check:
285+            self.log("in repair mode, checking all servers for existing shares")
286+            servers_to_ask = set(self.all_serverids)
287+            self._needs_repair_check = False
288+
289+        else:
290+            assert self._first_time
291+            servers_to_ask = \
292+                set(self.all_serverids).difference(set(self.writable_serverids))
293+            self.log("checking %d readonly servers" % len(servers_to_ask))
294+
295+        self._first_time = False
296+
297+        # don't ask peers more than once
298+        servers_to_ask.difference_update(self.contacted_trackers)
299+
300+        servers_to_ask = map(lambda x: self._trackers[x], servers_to_ask)
301+
302+        responses = []
303+        for server in servers_to_ask:
304+            self.log("Asking server %s for any shares that it already has"
305+                     % (idlib.shortnodeid_b2a(server.get_serverid())))
306+            d = server.ask_about_existing_shares()
307+            self.contacted_trackers.add(server.get_serverid())
308+            d.addBoth(self._handle_existing_response, server)
309+            responses.append(d)
310 
311hunk ./src/allmydata/immutable/upload.py 375
312-        # We now ask servers that can't hold any new shares about existing
313-        # shares that they might have for our SI. Once this is done, we
314-        # start placing the shares that we haven't already accounted
315-        # for.
316-        ds = []
317-        if self._status and readonly_trackers:
318-            self._status.set_status("Contacting readonly servers to find "
319-                                    "any existing shares")
320-        for tracker in readonly_trackers:
321-            assert isinstance(tracker, ServerTracker)
322-            d = tracker.ask_about_existing_shares()
323-            d.addBoth(self._handle_existing_response, tracker)
324-            ds.append(d)
325             self.num_servers_contacted += 1
326             self.query_count += 1
327hunk ./src/allmydata/immutable/upload.py 377
328-            self.log("asking server %s for any existing shares" %
329-                     (tracker.name(),), level=log.NOISY)
330-        dl = defer.DeferredList(ds)
331-        dl.addCallback(lambda ign: self._loop())
332-        return dl
333 
334hunk ./src/allmydata/immutable/upload.py 378
335+        d = defer.DeferredList(responses)
336+        # If we found any existing shares and need to look more
337+        # thoroughly in another pass, this will catch it.
338+        d.addCallback(self._find_existing_shares)
339+        return d
340 
341hunk ./src/allmydata/immutable/upload.py 384
342-    def _handle_existing_response(self, res, tracker):
343+    def _handle_existing_response(self, res, server):
344         """
345         I handle responses to the queries sent by
346hunk ./src/allmydata/immutable/upload.py 387
347-        Tahoe2ServerSelector._existing_shares.
348+        Tahoe2ServerSelector._find_existing_shares.
349         """
350hunk ./src/allmydata/immutable/upload.py 389
351-        serverid = tracker.get_serverid()
352         if isinstance(res, failure.Failure):
353hunk ./src/allmydata/immutable/upload.py 390
354+            serverid = server.get_serverid()
355             self.log("%s got error during existing shares check: %s"
356hunk ./src/allmydata/immutable/upload.py 392
357-                    % (tracker.name(), res), level=log.UNUSUAL)
358+                    % (idlib.shortnodeid_b2a(serverid), res),
359+                    level=log.UNUSUAL)
360+            self._remove_server(server)
361             self.error_count += 1
362             self.bad_query_count += 1
363hunk ./src/allmydata/immutable/upload.py 397
364+            self.last_failure_msg = ("last failure (from %s) was: %s" % \
365+                                     (server, res))
366+
367         else:
368             buckets = res
369hunk ./src/allmydata/immutable/upload.py 402
370-            if buckets:
371-                self.serverids_with_shares.add(serverid)
372             self.log("response to get_buckets() from server %s: alreadygot=%s"
373hunk ./src/allmydata/immutable/upload.py 403
374-                    % (tracker.name(), tuple(sorted(buckets))),
375+                    % (server.name(), tuple(sorted(buckets))),
376                     level=log.NOISY)
377             for bucket in buckets:
378hunk ./src/allmydata/immutable/upload.py 406
379-                self.preexisting_shares.setdefault(bucket, set()).add(serverid)
380-                self.homeless_shares.discard(bucket)
381-            self.full_count += 1
382-            self.bad_query_count += 1
383+                # Add the edge (peer, shnum) somewhere where it will be
384+                # remembered. Preexisting shares is as logical a place
385+                # as any, but we may as well just keep a list of tuples
386+                # around, since that's a pretty easy way to do this.
387+                serverid = server.get_serverid()
388+                self._graph.add_server_with_share(serverid, bucket)
389+                if not self.repair:
390+                    self._repair = True
391+                    self._needs_repair_check = True
392 
393hunk ./src/allmydata/immutable/upload.py 416
394+            if buckets:
395+                self.good_query_count += 1
396+
397+            else:
398+                self.bad_query_count += 1
399+                self.full_count += 1
400+
401+
402+    def _place_shares(self):
403+        """
404+        I handle the process of placing shares.
405+        """
406+        # Share placement is one or more iterations of four steps.
407+        #
408+        # 1: Compute a matching in a bipartite graph.
409+        # 2: Verify the suitability of the matching for what we're
410+        #    doing.
411+        # 3: Create any write buckets that are needed for the placement.
412+        # 4: Create write buckets for shares that are not already
413+        #    placed.
414+        #
415+        # Step 2 can vary depending on what we're doing. In a repair,
416+        # for example, we may want to allow for any matching (so long as
417+        # there is some server out there that we can upload to) because
418+        # placing shares on the grid, even if they are not optimally
419+        # distributed, can help the health of the file.
420+        #
421+        # Step 1 should not fail under any circumstances. If step 2
422+        # fails, the process will abort with a failure message to the
423+        # caller. Step 3 may fail. If it does, we'll remove the failing
424+        # peer from our graph and return to step 1. If step 4 fails, we
425+        # may care, but probably won't. When we're done with these
426+        # steps, we'll fire self._done with the graph that it can use to
427+        # check everything as the process continues.
428+        #
429+        # Note that this loop will eventually terminate -- we have a
430+        # finite number of peers, and we guarantee that we remove at
431+        # least one peer from the graph on each failure in step 3, so we
432+        # will eventually run out of peers.
433+        self.log("Attempting to place shares")
434+
435+        if (self.repair and self._needs_repair_check) or self._first_time:
436+            d = self._find_existing_shares()
437 
438hunk ./src/allmydata/immutable/upload.py 460
439-    def _get_progress_message(self):
440-        if not self.homeless_shares:
441-            msg = "placed all %d shares, " % (self.total_shares)
442         else:
443hunk ./src/allmydata/immutable/upload.py 461
444-            msg = ("placed %d shares out of %d total (%d homeless), " %
445-                   (self.total_shares - len(self.homeless_shares),
446-                    self.total_shares,
447-                    len(self.homeless_shares)))
448-        return (msg + "want to place shares on at least %d servers such that "
449-                      "any %d of them have enough shares to recover the file, "
450-                      "sent %d queries to %d servers, "
451-                      "%d queries placed some shares, %d placed none "
452-                      "(of which %d placed none due to the server being"
453-                      " full and %d placed none due to an error)" %
454-                        (self.servers_of_happiness, self.needed_shares,
455-                         self.query_count, self.num_servers_contacted,
456-                         self.good_query_count, self.bad_query_count,
457-                         self.full_count, self.error_count))
458+            d = fireEventually()
459 
460hunk ./src/allmydata/immutable/upload.py 463
461+        d.addCallback(lambda ignored: self._compute_matching())
462+        d.addCallback(self._check_matching)
463+        d.addCallback(self._make_write_buckets)
464+        d.addBoth(self._check_for_done)
465+        return d
466 
467hunk ./src/allmydata/immutable/upload.py 469
468-    def _loop(self):
469-        if not self.homeless_shares:
470-            merged = merge_servers(self.preexisting_shares, self.use_trackers)
471-            effective_happiness = servers_of_happiness(merged)
472-            if self.servers_of_happiness <= effective_happiness:
473-                msg = ("server selection successful for %s: %s: pretty_print_merged: %s, "
474-                       "self.use_trackers: %s, self.preexisting_shares: %s") \
475-                       % (self, self._get_progress_message(),
476-                          pretty_print_shnum_to_servers(merged),
477-                          [', '.join([str_shareloc(k,v)
478-                                      for k,v in st.buckets.iteritems()])
479-                           for st in self.use_trackers],
480-                          pretty_print_shnum_to_servers(self.preexisting_shares))
481-                self.log(msg, level=log.OPERATIONAL)
482-                return (self.use_trackers, self.preexisting_shares)
483-            else:
484-                # We're not okay right now, but maybe we can fix it by
485-                # redistributing some shares. In cases where one or two
486-                # servers has, before the upload, all or most of the
487-                # shares for a given SI, this can work by allowing _loop
488-                # a chance to spread those out over the other servers,
489-                delta = self.servers_of_happiness - effective_happiness
490-                shares = shares_by_server(self.preexisting_shares)
491-                # Each server in shares maps to a set of shares stored on it.
492-                # Since we want to keep at least one share on each server
493-                # that has one (otherwise we'd only be making
494-                # the situation worse by removing distinct servers),
495-                # each server has len(its shares) - 1 to spread around.
496-                shares_to_spread = sum([len(list(sharelist)) - 1
497-                                        for (server, sharelist)
498-                                        in shares.items()])
499-                if delta <= len(self.uncontacted_trackers) and \
500-                   shares_to_spread >= delta:
501-                    items = shares.items()
502-                    while len(self.homeless_shares) < delta:
503-                        # Loop through the allocated shares, removing
504-                        # one from each server that has more than one
505-                        # and putting it back into self.homeless_shares
506-                        # until we've done this delta times.
507-                        server, sharelist = items.pop()
508-                        if len(sharelist) > 1:
509-                            share = sharelist.pop()
510-                            self.homeless_shares.add(share)
511-                            self.preexisting_shares[share].remove(server)
512-                            if not self.preexisting_shares[share]:
513-                                del self.preexisting_shares[share]
514-                            items.append((server, sharelist))
515-                        for writer in self.use_trackers:
516-                            writer.abort_some_buckets(self.homeless_shares)
517-                    return self._loop()
518-                else:
519-                    # Redistribution won't help us; fail.
520-                    server_count = len(self.serverids_with_shares)
521-                    failmsg = failure_message(server_count,
522-                                              self.needed_shares,
523-                                              self.servers_of_happiness,
524-                                              effective_happiness)
525-                    servmsgtempl = "server selection unsuccessful for %r: %s (%s), merged=%s"
526-                    servmsg = servmsgtempl % (
527-                        self,
528-                        failmsg,
529-                        self._get_progress_message(),
530-                        pretty_print_shnum_to_servers(merged)
531-                        )
532-                    self.log(servmsg, level=log.INFREQUENT)
533-                    return self._failed("%s (%s)" % (failmsg, self._get_progress_message()))
534 
535hunk ./src/allmydata/immutable/upload.py 470
536-        if self.uncontacted_trackers:
537-            tracker = self.uncontacted_trackers.pop(0)
538-            # TODO: don't pre-convert all serverids to ServerTrackers
539-            assert isinstance(tracker, ServerTracker)
540+    def _compute_matching(self, ignored=None):
541+        """
542+        I fire when we know about a decent number of the existing
543+        shares. I compute a matching given this information. I may be
544+        called from a Deferred context, but can compute my results
545+        synchronously and do not return a deferred.
546+        """
547+        # How does this work?
548+        #
549+        # Well, at this point we've got some shares, some servers that
550+        # we can write to, and some preexisting shares, represented as
551+        # server-share edges. What do we want to do?
552+        #
553+        # - Compute a graph that represents the possibility of our
554+        #   shares and writable servers.
555+        #   
556+        #   The servers of happiness check is current written to take a
557+        #   sharemap -- a dict mapping a peer to a set of shares. We
558+        #   should assess whether this is still the best option for what
559+        #   we want to do. What we really want to do is split up the
560+        #   routines to check and to build the graph. Specifically, we
561+        #   want to define a complete graph for the set of peers that we
562+        #   know we can write to and the set of shares. Then we want to
563+        #   add edges to that graph. Then we want to run the check on
564+        #   the graph.
565+        #
566+        #   This induces the following breakdown of methods, or
567+        #   something like it:
568+        #
569+        #     - make_complete_bipartite_graph(shares, peers)
570+        #     - add_edge(edge, graph)
571+        #       - Or make the graph an object.
572+        #
573+        #     - servers_of_happiness(graph)
574+        #       - Or make a method of the graph object.
575+        #
576+        self.log("Computing matching")
577 
578hunk ./src/allmydata/immutable/upload.py 508
579-            shares_to_ask = set(sorted(self.homeless_shares)[:1])
580-            self.homeless_shares -= shares_to_ask
581-            self.query_count += 1
582-            self.num_servers_contacted += 1
583-            if self._status:
584-                self._status.set_status("Contacting Servers [%s] (first query),"
585-                                        " %d shares left.."
586-                                        % (tracker.name(),
587-                                           len(self.homeless_shares)))
588-            d = tracker.query(shares_to_ask)
589-            d.addBoth(self._got_response, tracker, shares_to_ask,
590-                      self.contacted_trackers)
591-            return d
592-        elif self.contacted_trackers:
593-            # ask a server that we've already asked.
594-            if not self._started_second_pass:
595-                self.log("starting second pass",
596-                        level=log.NOISY)
597-                self._started_second_pass = True
598-            num_shares = mathutil.div_ceil(len(self.homeless_shares),
599-                                           len(self.contacted_trackers))
600-            tracker = self.contacted_trackers.pop(0)
601-            shares_to_ask = set(sorted(self.homeless_shares)[:num_shares])
602-            self.homeless_shares -= shares_to_ask
603+        # Only give the peer selector as many peers as it would need to
604+        # maximize happiness. This helps to ensure that the shares are
605+        # where the downloader expects them to be.
606+        self.log("adding %d servers to the graph" % len(self.writable_serverids))
607+        self._graph.add_servers(self.writable_serverids)
608+
609+        # Our successors will check this result to see that it is okay
610+        # -- our only job is to compute it.
611+        return self._graph.get_share_assignments()
612+
613+
614+    def _check_matching(self, share_assignments):
615+        """
616+        I check that the matching is okay.
617+        """
618+        #self.log("Done computing matching and share assignments, "
619+        #         "got happiness %d" % happiness)
620+        if self._graph.is_healthy():
621+            # Try to repair regardless of how healthy the file will be
622+            # when we're done, since it will be more healthy than it
623+            # was.
624+            self.log("Upload is correctly distributed, continuing")
625+            return share_assignments
626+
627+        if self.repair and len(share_assignments) >= 1:
628+            # If we have a matching at all (meaning that there is at
629+            # least one peer still out there), return True -- we'll be
630+            # helping the file availability.
631+            self.log("Upload is not correctly distributed, but we're "
632+                     "repairing so we'll continue the process anyway")
633+            return share_assignments
634+
635+        # Otherwise, refuse to upload
636+        self.log("Upload fails happiness criterion and we're not repairing, "
637+                 "dying")
638+
639+        # Get the number of peers that we're using.
640+        msg = self._graph.get_failure_message()
641+        msg = "server selection failed for %s: %s (%s)" % \
642+                (self, msg, self._get_progress_message())
643+
644+        # We keep track of the failures that we see, and include the
645+        # last one in the failure message if we have it. This can help
646+        # in debugging.
647+        if self.last_failure_msg:
648+            msg += "\nlast failure was: %s" % self.last_failure_msg
649+
650+        raise UploadUnhappinessError(msg)
651+
652+
653+    def _make_write_buckets(self, share_assignments):
654+        """
655+        I make a list of write buckets, which I can then return to my
656+        callers who need them to do things.
657+        """
658+        # So here we're going to ask self.g for the edges that it used
659+        # in its matching. For each of those edges, we need to ensure
660+        # that the peer holds the share. If the peer already holds the
661+        # share, then we don't have to worry about it. Otherwise, we've
662+        # got to use the PeerTracker to ask it to hold the share.
663+        self.log("Got share assignments, starting to allocate shares")
664+
665+        ds = []
666+
667+        for server in share_assignments:
668+            server_tracker = self._trackers[server]
669+
670+            shares = set(share_assignments[server])
671+            d = server_tracker.query(shares)
672             self.query_count += 1
673hunk ./src/allmydata/immutable/upload.py 578
674-            if self._status:
675-                self._status.set_status("Contacting Servers [%s] (second query),"
676-                                        " %d shares left.."
677-                                        % (tracker.name(),
678-                                           len(self.homeless_shares)))
679-            d = tracker.query(shares_to_ask)
680-            d.addBoth(self._got_response, tracker, shares_to_ask,
681-                      self.contacted_trackers2)
682-            return d
683-        elif self.contacted_trackers2:
684-            # we've finished the second-or-later pass. Move all the remaining
685-            # servers back into self.contacted_trackers for the next pass.
686-            self.contacted_trackers.extend(self.contacted_trackers2)
687-            self.contacted_trackers2[:] = []
688-            return self._loop()
689-        else:
690-            # no more servers. If we haven't placed enough shares, we fail.
691-            merged = merge_servers(self.preexisting_shares, self.use_trackers)
692-            effective_happiness = servers_of_happiness(merged)
693-            if effective_happiness < self.servers_of_happiness:
694-                msg = failure_message(len(self.serverids_with_shares),
695-                                      self.needed_shares,
696-                                      self.servers_of_happiness,
697-                                      effective_happiness)
698-                msg = ("server selection failed for %s: %s (%s)" %
699-                       (self, msg, self._get_progress_message()))
700-                if self.last_failure_msg:
701-                    msg += " (%s)" % (self.last_failure_msg,)
702-                self.log(msg, level=log.UNUSUAL)
703-                return self._failed(msg)
704-            else:
705-                # we placed enough to be happy, so we're done
706-                if self._status:
707-                    self._status.set_status("Placed all shares")
708-                msg = ("server selection successful (no more servers) for %s: %s: %s" % (self,
709-                            self._get_progress_message(), pretty_print_shnum_to_servers(merged)))
710-                self.log(msg, level=log.OPERATIONAL)
711-                return (self.use_trackers, self.preexisting_shares)
712+            self.contacted_trackers.add(server_tracker.get_serverid())
713+            d.addBoth(self._handle_response, server_tracker,
714+                      shares)
715+            ds.append(d)
716+
717+        # Once we satisfy the dispersal requirement, we also need to
718+        # check for any shares that didn't get used in the maximal
719+        # matching and put those somewhere.
720+        return defer.DeferredList(ds)
721+
722+
723+    def _check_for_done(self, results):
724+        """
725+        I check to see whether we're done. If we are, I fire self._done
726+        with our graph instance. If we aren't, I cause self._done to
727+        errback with a message explaining why.
728+        """
729+        self.log("Done asking servers; checking to see if we're done")
730+        if isinstance(results, failure.Failure):
731+            # We're checking for doneness because of an exception, most
732+            # likely a happiness error. Raised and uncaught exceptions
733+            # at this point signify upload failure, so we'll pass them
734+            # to our caller and let them deal with it.
735+            return self._done.errback(results)
736+
737+        if self._graph.needs_recomputation():
738+            # We lost one or more servers during the last upload cycle.
739+            # Re-do the loop.
740+            return self._place_shares()
741+
742+        # Otherwise, we're done. We need to tell the graph about the
743+        # ServerTrackers that we're using so that the other methods can
744+        # get them later.
745+        self._graph.set_trackers(self.use_trackers)
746+        self._cancel_unused_assignments()
747+        self._done.callback(self._graph)
748+
749 
750hunk ./src/allmydata/immutable/upload.py 616
751-    def _got_response(self, res, tracker, shares_to_ask, put_tracker_here):
752+    def _cancel_unused_assignments(self):
753+        unused = self._graph.get_obsolete_allocations()
754+        for server in unused:
755+            t = self._trackers[server]
756+            t.abort_some_buckets(list(unused[server]))
757+
758+            # TODO: remove from self.use_peers
759+
760+
761+    def _handle_response(self, res, server, shares_to_ask):
762         if isinstance(res, failure.Failure):
763             # This is unusual, and probably indicates a bug or a network
764             # problem.
765hunk ./src/allmydata/immutable/upload.py 629
766-            self.log("%s got error during server selection: %s" % (tracker, res),
767-                    level=log.UNUSUAL)
768+            self.log("%s got error during server selection: %s" %
769+                     (server.name(), res),
770+                     level=log.UNUSUAL)
771+            self._remove_server(server)
772             self.error_count += 1
773             self.bad_query_count += 1
774hunk ./src/allmydata/immutable/upload.py 635
775-            self.homeless_shares |= shares_to_ask
776-            if (self.uncontacted_trackers
777-                or self.contacted_trackers
778-                or self.contacted_trackers2):
779-                # there is still hope, so just loop
780-                pass
781-            else:
782-                # No more servers, so this upload might fail (it depends upon
783-                # whether we've hit servers_of_happiness or not). Log the last
784-                # failure we got: if a coding error causes all servers to fail
785-                # in the same way, this allows the common failure to be seen
786-                # by the uploader and should help with debugging
787-                msg = ("last failure (from %s) was: %s" % (tracker, res))
788-                self.last_failure_msg = msg
789+            self.last_failure_msg = ("last failure (from %s) was: %s" % \
790+                                     (server.name(), res))
791         else:
792hunk ./src/allmydata/immutable/upload.py 638
793+            # Did we allocate the shares?
794             (alreadygot, allocated) = res
795hunk ./src/allmydata/immutable/upload.py 640
796-            self.log("response to allocate_buckets() from server %s: alreadygot=%s, allocated=%s"
797-                    % (tracker.name(),
798-                       tuple(sorted(alreadygot)), tuple(sorted(allocated))),
799-                    level=log.NOISY)
800-            progress = False
801-            for s in alreadygot:
802-                self.preexisting_shares.setdefault(s, set()).add(tracker.get_serverid())
803-                if s in self.homeless_shares:
804-                    self.homeless_shares.remove(s)
805-                    progress = True
806-                elif s in shares_to_ask:
807-                    progress = True
808+            # First, we worry about query counting. We define this query as
809+            # good (or perhaps productive) if it resulted in shares being
810+            # removed from homeless shares. Otherwise, it is bad. If we
811+            # got to this point without allocating shares, we can assume
812+            # that the server is full -- or at least not accepting
813+            # shares from us -- because it still works and refused to
814+            # accept the share.
815+            shares_for_query = set(alreadygot)
816+            shares_for_query.update(set(allocated))
817 
818hunk ./src/allmydata/immutable/upload.py 650
819-            # the ServerTracker will remember which shares were allocated on
820-            # that peer. We just have to remember to use them.
821-            if allocated:
822-                self.use_trackers.add(tracker)
823-                progress = True
824+            asked_for_shares = set(shares_to_ask)
825 
826hunk ./src/allmydata/immutable/upload.py 652
827-            if allocated or alreadygot:
828-                self.serverids_with_shares.add(tracker.get_serverid())
829+            serverid = server.get_serverid()
830 
831hunk ./src/allmydata/immutable/upload.py 654
832-            not_yet_present = set(shares_to_ask) - set(alreadygot)
833-            still_homeless = not_yet_present - set(allocated)
834-
835-            if progress:
836-                # They accepted at least one of the shares that we asked
837-                # them to accept, or they had a share that we didn't ask
838-                # them to accept but that we hadn't placed yet, so this
839-                # was a productive query
840+            # If asked_for_shares is a subset of shares_for_query, then
841+            # we were productive. Otherwise, we weren't.
842+            # XXX: Not necessarily.
843+            if asked_for_shares.issubset(shares_for_query):
844                 self.good_query_count += 1
845hunk ./src/allmydata/immutable/upload.py 659
846+
847             else:
848                 self.bad_query_count += 1
849                 self.full_count += 1
850hunk ./src/allmydata/immutable/upload.py 664
851 
852-            if still_homeless:
853-                # In networks with lots of space, this is very unusual and
854-                # probably indicates an error. In networks with servers that
855-                # are full, it is merely unusual. In networks that are very
856-                # full, it is common, and many uploads will fail. In most
857-                # cases, this is obviously not fatal, and we'll just use some
858-                # other servers.
859+            self.log("Response from server %s: alreadygot %s, allocated %s" %
860+                  (server.name(), alreadygot, allocated))
861+            if alreadygot:
862+                # Hm, we're finding shares for the SI. That's normal if we're
863+                # in repair mode, and says that we should be in repair mode if
864+                # we're not.
865+                if not self.repair:
866+                    self.repair = True
867+                    # If we've already done a repair check, we don't
868+                    # need to do another one.
869+                    if not self._needs_repair_check:
870+                        self._needs_repair_check = True
871+
872+                for shnum in alreadygot:
873+                    self._graph.add_server_with_share(serverid, shnum)
874+
875+            if not allocated:
876+                # Uh oh, they ran out of space. We'll remove them
877+                # from consideration, and then add any already
878+                # existing shaes back in, then recompute.
879+                self._graph.mark_full_server(serverid)
880+                if serverid in self.writable_serverids:
881+                    self.writable_serverids.remove(serverid)
882+                # The graph will take care of the process of
883+                # regenerating what we've done.
884 
885hunk ./src/allmydata/immutable/upload.py 690
886-                # some shares are still homeless, keep trying to find them a
887-                # home. The ones that were rejected get first priority.
888-                self.homeless_shares |= still_homeless
889-                # Since they were unable to accept all of our requests, so it
890-                # is safe to assume that asking them again won't help.
891             else:
892hunk ./src/allmydata/immutable/upload.py 691
893-                # if they *were* able to accept everything, they might be
894-                # willing to accept even more.
895-                put_tracker_here.append(tracker)
896+                self.use_trackers.add(server)
897+                for a in allocated:
898+                    if a in self.allocated_shares:
899+                        # We need to cancel the existing PeerTracker.
900+                        t = self.allocated_shares[a]
901+                        t.abort_some_buckets([a])
902+                    self.allocated_shares[a] = server
903+
904+                    self._graph.confirm_share_allocation(serverid, a)
905+
906 
907hunk ./src/allmydata/immutable/upload.py 702
908-        # now loop
909-        return self._loop()
910+    def _remove_server(self, server):
911+        """
912+        I handle the event when a server breaks during share assignment. I
913+        remove that server from the graph, mark all of its shares as
914+        homeless, and remove any preexisting edges that I know about for
915+        that server.
916+        """
917+        self.log("in remove server for bad server %s" % server.name())
918+        serverid = server.get_serverid()
919+        self._graph.mark_bad_server(serverid)
920+
921+        if serverid in self.all_serverids:
922+            self.all_serverids.remove(serverid)
923+
924+        if serverid in self.writable_serverids:
925+            self.writable_serverids.remove(serverid)
926+
927+        if server in self.use_trackers:
928+            self.use_trackers.remove(server)
929+
930+
931+    def _get_progress_message(self):
932+        homeless_shares = self._graph.get_homeless_shares()
933+        if not homeless_shares:
934+            msg = "placed all %d shares, " % (self.total_shares)
935+        else:
936+            msg = ("placed %d shares out of %d total (%d homeless), " %
937+                   (self.total_shares - len(homeless_shares),
938+                    self.total_shares,
939+                    len(homeless_shares)))
940+        return (msg + "want to place shares on at least %d servers such that "
941+                      "any %d of them have enough shares to recover the file, "
942+                      "sent %d queries to %d servers, "
943+                      "%d queries placed some shares, %d placed none "
944+                      "(of which %d placed none due to the server being"
945+                      " full and %d placed none due to an error)" %
946+                        (self.servers_of_happiness, self.needed_shares,
947+                         self.query_count, len(self.contacted_trackers),
948+                         self.good_query_count, self.bad_query_count,
949+                         self.full_count, self.error_count))
950 
951 
952     def _failed(self, msg):
953hunk ./src/allmydata/immutable/upload.py 751
954         place shares for this file. I then raise an
955         UploadUnhappinessError with my msg argument.
956         """
957-        for tracker in self.use_trackers:
958-            assert isinstance(tracker, ServerTracker)
959-            tracker.abort()
960+        for server in self.use_trackers:
961+            assert isinstance(server, ServerTracker)
962+
963+            server.abort()
964+
965         raise UploadUnhappinessError(msg)
966 
967 
968hunk ./src/allmydata/immutable/upload.py 1127
969         d.addCallback(_done)
970         return d
971 
972-    def set_shareholders(self, (upload_trackers, already_servers), encoder):
973+    def set_shareholders(self, happiness_graph, encoder):
974         """
975hunk ./src/allmydata/immutable/upload.py 1129
976-        @param upload_trackers: a sequence of ServerTracker objects that
977-                                have agreed to hold some shares for us (the
978-                                shareids are stashed inside the ServerTracker)
979-        @paran already_servers: a dict mapping sharenum to a set of serverids
980-                                that claim to already have this share
981+        I fire when peer selection is done. I take the happiness graph
982+        that the peer selector made (which knows which peers hold which
983+        shares), and tell my encoder about that. The Encoder will then
984+        try to upload those shares, using the happiness to deal with
985+        failures along the way.
986         """
987hunk ./src/allmydata/immutable/upload.py 1135
988-        msgtempl = "set_shareholders; upload_trackers is %s, already_servers is %s"
989-        values = ([', '.join([str_shareloc(k,v)
990-                              for k,v in st.buckets.iteritems()])
991-                   for st in upload_trackers], already_servers)
992-        self.log(msgtempl % values, level=log.OPERATIONAL)
993-        # record already-present shares in self._results
994-        self._results.preexisting_shares = len(already_servers)
995+        # TODO: We need a way to get preexisting shares.
996+        self._server_trackers = {} # k: shnum, v: instance of PeerTracker
997+        upload_servers = happiness_graph.get_trackers()
998+        for server in upload_servers:
999+            assert isinstance(server, ServerTracker)
1000 
1001hunk ./src/allmydata/immutable/upload.py 1141
1002-        self._server_trackers = {} # k: shnum, v: instance of ServerTracker
1003-        for tracker in upload_trackers:
1004-            assert isinstance(tracker, ServerTracker)
1005         buckets = {}
1006hunk ./src/allmydata/immutable/upload.py 1142
1007-        servermap = already_servers.copy()
1008-        for tracker in upload_trackers:
1009-            buckets.update(tracker.buckets)
1010-            for shnum in tracker.buckets:
1011-                self._server_trackers[shnum] = tracker
1012-                servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1013-        assert len(buckets) == sum([len(tracker.buckets)
1014-                                    for tracker in upload_trackers]), \
1015+
1016+        for server in upload_servers:
1017+            buckets.update(server.buckets)
1018+            for shnum in server.buckets:
1019+                self._server_trackers[shnum] = server
1020+        assert len(buckets) == sum([len(server.buckets) for server in upload_servers]), \
1021             "%s (%s) != %s (%s)" % (
1022                 len(buckets),
1023                 buckets,
1024hunk ./src/allmydata/immutable/upload.py 1151
1025-                sum([len(tracker.buckets) for tracker in upload_trackers]),
1026-                [(t.buckets, t.get_serverid()) for t in upload_trackers]
1027+                sum([len(server.buckets) for server in upload_servers]),
1028+                [(s.buckets, s.serverid) for s in upload_servers]
1029                 )
1030hunk ./src/allmydata/immutable/upload.py 1154
1031-        encoder.set_shareholders(buckets, servermap)
1032+        encoder.set_shareholders(buckets, happiness_graph)
1033 
1034     def _encrypted_done(self, verifycap):
1035         """ Returns a Deferred that will fire with the UploadResults instance. """
1036}
1037[immutable/encode.py: Alter the encoder to work with tann IPeerSelector object
1038Kevan Carstensen <kevan@isnotajoke.com>**20110424232019
1039 Ignore-this: d3206059675f684e7b97dafb04895e9e
1040] {
1041hunk ./src/allmydata/immutable/encode.py 195
1042         else:
1043             raise KeyError("unknown parameter name '%s'" % name)
1044 
1045-    def set_shareholders(self, landlords, servermap):
1046+    def set_shareholders(self, landlords, graph):
1047         assert isinstance(landlords, dict)
1048         for k in landlords:
1049             assert IStorageBucketWriter.providedBy(landlords[k])
1050hunk ./src/allmydata/immutable/encode.py 200
1051         self.landlords = landlords.copy()
1052-        assert isinstance(servermap, dict)
1053-        for v in servermap.itervalues():
1054-            assert isinstance(v, set)
1055-        self.servermap = servermap.copy()
1056+        self.g = graph
1057+        # TODO: Use this.
1058+        self.servermap = {}
1059 
1060     def start(self):
1061         """ Returns a Deferred that will fire with the verify cap (an instance of
1062hunk ./src/allmydata/immutable/encode.py 484
1063             self.landlords[shareid].abort()
1064             peerid = self.landlords[shareid].get_peerid()
1065             assert peerid
1066+
1067             del self.landlords[shareid]
1068hunk ./src/allmydata/immutable/encode.py 486
1069-            self.servermap[shareid].remove(peerid)
1070-            if not self.servermap[shareid]:
1071-                del self.servermap[shareid]
1072+            self.g.mark_bad_server(peerid)
1073         else:
1074             # even more UNUSUAL
1075             self.log("they weren't in our list of landlords", parent=ln,
1076hunk ./src/allmydata/immutable/encode.py 491
1077                      level=log.WEIRD, umid="TQGFRw")
1078-        happiness = happinessutil.servers_of_happiness(self.servermap)
1079-        if happiness < self.servers_of_happiness:
1080-            peerids = set(happinessutil.shares_by_server(self.servermap).keys())
1081-            msg = happinessutil.failure_message(len(peerids),
1082-                                                self.required_shares,
1083-                                                self.servers_of_happiness,
1084-                                                happiness)
1085-            msg = "%s: %s" % (msg, why)
1086-            raise UploadUnhappinessError(msg)
1087+        # Ask the graph whether it makes sense to continue our work.
1088+        self.g.get_share_assignments(shallow=True)
1089+        if not self.g.is_healthy():
1090+            edges = self.g.get_share_assignments()
1091+            peers = [x[0] for x in edges]
1092+            peerids = set(peers)
1093+
1094+            msg = self.g.get_failure_message()
1095+            raise UploadUnhappinessError("%s: %s" % (msg, why))
1096+
1097         self.log("but we can still continue with %s shares, we'll be happy "
1098hunk ./src/allmydata/immutable/encode.py 502
1099-                 "with at least %s" % (happiness,
1100+                 "with at least %s" % (5, # XXX
1101                                        self.servers_of_happiness),
1102                  parent=ln)
1103 
1104}
1105[interfaces.py: Add IPeerSelector interfaces
1106Kevan Carstensen <kevan@isnotajoke.com>**20110424232047
1107 Ignore-this: 63837f056d3f7753e76ca8105443e093
1108] hunk ./src/allmydata/interfaces.py 606
1109     """I am a node which represents a file: a sequence of bytes. I am not a
1110     container, like IDirectoryNode."""
1111 
1112+class IPeerSelector(Interface):
1113+    """
1114+    I select peers for an upload, maximizing some measure of health.
1115+
1116+    I keep track of the state of a grid relative to a file. This means
1117+    that I know about all of the peers that parts of that file could be
1118+    placed on, and about shares that have been placed on those peers.
1119+    Given this, I assign shares to peers in a way that maximizes the
1120+    file's health according to whichever definition of health I am
1121+    programmed with. I tell the uploader whether or not my assignment is
1122+    healthy. I keep track of failures during the process and update my
1123+    conclusions appropriately.
1124+    """
1125+    def add_peer_with_share(peerid, shnum):
1126+        """
1127+        Update my internal state to reflect the fact that peer peerid
1128+        holds share shnum. Called for shares that are detected before
1129+        peer selection begins.
1130+        """
1131+
1132+    def confirm_share_allocation(peerid, shnum):
1133+        """
1134+        Confirm that an allocated peer=>share pairing has been
1135+        successfully established.
1136+        """
1137+
1138+    def add_peers(peerids=set):
1139+        """
1140+        Update my internal state to include the peers in peerids as
1141+        potential candidates for storing a file.
1142+        """
1143+
1144+    def mark_full_peer(peerid):
1145+        """
1146+        Mark the peer peerid as full. This means that any
1147+        peer-with-share relationships I know about for peerid remain
1148+        valid, but that peerid will not be assigned any new shares.
1149+        """
1150+
1151+    def mark_bad_peer(peerid):
1152+        """
1153+        Mark the peer peerid as bad. This is typically called when an
1154+        error is encountered when communicating with a peer. I will
1155+        disregard any existing peer => share relationships associated
1156+        with peerid, and will not attempt to assign it any more shares.
1157+        """
1158+
1159+    def get_share_assignments(shallow=False):
1160+        """
1161+        Given what I know about the state of the grid, I'll attempt to
1162+        assign shares to peers in a way that maximizes my definition of
1163+        file health. I'll return a list of (share, peerid) tuples with
1164+        my decision.
1165+
1166+        I have an optional shallow parameter. If True, I will not
1167+        attempt to assign any new shares before checking happiness;
1168+        instead, I will rely only on the shares that I know are already
1169+        placed.
1170+        """
1171+
1172+    def is_healthy():
1173+        """
1174+        I return whether the share assignments I'm currently using
1175+        reflect a healthy file, based on my internal definitions.
1176+        """
1177+
1178+    def needs_recomputation():
1179+        """
1180+        I return True if the share assignments I last returned may have
1181+        become stale. This is a hint to the caller that they should call
1182+        get_share_assignments again.
1183+        """
1184+
1185 class IImmutableFileNode(IFileNode):
1186     def read(consumer, offset=0, size=None):
1187         """Download a portion (possibly all) of the file's contents, making
1188[test: fix existing tests to work with the new uploader + graph combination
1189Kevan Carstensen <kevan@isnotajoke.com>**20110424232116
1190 Ignore-this: 5adfadd33b693d06997183f70c1cfa1c
1191] {
1192hunk ./src/allmydata/test/test_cli.py 2415
1193         # enough shares. The one remaining share might be in either the
1194         # COMPLETE or the PENDING state.
1195         in_complete_msg = "ran out of shares: complete=sh0 pending= overdue= unused= need 3"
1196-        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on-fob7vqgd) overdue= unused= need 3"
1197+        in_pending_msg = "ran out of shares: complete= pending=Share(sh0-on"
1198 
1199         d.addCallback(lambda ign: self.do_cli("get", self.uri_1share))
1200         def _check1((rc, out, err)):
1201hunk ./src/allmydata/test/test_download.py 297
1202             # find the shares that were used and delete them
1203             shares = self.n._cnode._node._shares
1204             shnums = sorted([s._shnum for s in shares])
1205-            self.failUnlessEqual(shnums, [0,1,2,3])
1206+            self.failUnlessEqual(len(shnums), 4)
1207+            self._dead_shnums = shnums
1208 
1209             # break the RIBucketReader references
1210             # (we don't break the RIStorageServer references, because that
1211hunk ./src/allmydata/test/test_download.py 315
1212             self.failUnlessEqual("".join(c.chunks), plaintext)
1213             shares = self.n._cnode._node._shares
1214             shnums = sorted([s._shnum for s in shares])
1215-            self.failIfEqual(shnums, [0,1,2,3])
1216+            self.failIfEqual(shnums, self._dead_shnums)
1217         d.addCallback(_check_failover)
1218         return d
1219 
1220hunk ./src/allmydata/test/test_upload.py 148
1221                           for shnum in sharenums]),
1222                     )
1223 
1224+    def get_buckets(self, storage_index):
1225+        return [x[1] for x in filter(lambda x: x[0] == storage_index,
1226+                                     self.allocated)]
1227+
1228+
1229 class FakeBucketWriter:
1230     # a diagnostic version of storageserver.BucketWriter
1231     def __init__(self, size):
1232hunk ./src/allmydata/test/test_upload.py 418
1233 
1234     def test_first_error_all(self):
1235         self.make_node("first-fail")
1236+        self.set_encoding_parameters(k=25, happy=1, n=100)
1237         d = self.shouldFail(UploadUnhappinessError, "first_error_all",
1238                             "server selection failed",
1239                             upload_data, self.u, DATA)
1240hunk ./src/allmydata/test/test_upload.py 444
1241         # enough to ensure a second pass with 100 shares).
1242         mode = dict([(0,"good")] + [(i,"second-fail") for i in range(1,40)])
1243         self.make_node(mode, 40)
1244+        self.set_encoding_parameters(k=25, happy=1, n=100)
1245         d = upload_data(self.u, DATA)
1246         d.addCallback(extract_uri)
1247         d.addCallback(self._check_large, SIZE_LARGE)
1248hunk ./src/allmydata/test/test_upload.py 452
1249 
1250     def test_second_error_all(self):
1251         self.make_node("second-fail")
1252-        d = self.shouldFail(UploadUnhappinessError, "second_error_all",
1253+        self.set_encoding_parameters(k=25, happy=1, n=100)
1254+        # The upload won't ask any peer to allocate shares more than once on
1255+        # the first try, so we have to upload twice to see that the
1256+        # errors are detected.
1257+        d = upload_data(self.u, DATA)
1258+        d.addCallback(lambda ignored:
1259+            self.shouldFail(UploadUnhappinessError, "second_error_all",
1260                             "server selection failed",
1261hunk ./src/allmydata/test/test_upload.py 460
1262-                            upload_data, self.u, DATA)
1263+                            upload_data, self.u, DATA))
1264         def _check((f,)):
1265             self.failUnlessIn("placed 10 shares out of 100 total", str(f.value))
1266             # there should also be a 'last failure was' message
1267hunk ./src/allmydata/test/test_upload.py 465
1268             self.failUnlessIn("ServerError", str(f.value))
1269-        d.addCallback(_check)
1270+        #d.addCallback(_check)
1271         return d
1272 
1273 class FullServer(unittest.TestCase):
1274hunk ./src/allmydata/test/test_upload.py 545
1275             for s in self.node.last_servers:
1276                 allocated = s.allocated
1277                 self.failUnlessEqual(len(allocated), 2)
1278-                self.failUnlessEqual(s.queries, 2)
1279+                self.failUnlessEqual(s.queries, 1)
1280         d.addCallback(_check)
1281         return d
1282 
1283hunk ./src/allmydata/test/test_upload.py 569
1284                     self.failUnlessEqual(s.queries, 1)
1285                     got_one.append(s)
1286                 else:
1287-                    self.failUnlessEqual(s.queries, 2)
1288+                    self.failUnlessEqual(s.queries, 1)
1289                     got_two.append(s)
1290             self.failUnlessEqual(len(got_one), 49)
1291             self.failUnlessEqual(len(got_two), 1)
1292hunk ./src/allmydata/test/test_upload.py 593
1293             for s in self.node.last_servers:
1294                 allocated = s.allocated
1295                 self.failUnlessEqual(len(allocated), 4)
1296-                self.failUnlessEqual(s.queries, 2)
1297+                self.failUnlessEqual(s.queries, 1)
1298         d.addCallback(_check)
1299         return d
1300 
1301hunk ./src/allmydata/test/test_upload.py 796
1302         d = selector.get_shareholders(broker, sh, storage_index,
1303                                       share_size, block_size, num_segments,
1304                                       10, 3, 4)
1305-        def _have_shareholders((upload_trackers, already_servers)):
1306-            assert servers_to_break <= len(upload_trackers)
1307+        def _have_shareholders(graph):
1308+            assert isinstance(graph, HappinessGraph)
1309+
1310+            upload_servers = graph.get_trackers()
1311+
1312+            assert servers_to_break <= len(upload_servers)
1313             for index in xrange(servers_to_break):
1314hunk ./src/allmydata/test/test_upload.py 803
1315-                tracker = list(upload_trackers)[index]
1316-                for share in tracker.buckets.keys():
1317-                    tracker.buckets[share].abort()
1318+                server = list(upload_servers)[index]
1319+                for share in server.buckets.keys():
1320+                    server.buckets[share].abort()
1321             buckets = {}
1322hunk ./src/allmydata/test/test_upload.py 807
1323-            servermap = already_servers.copy()
1324-            for tracker in upload_trackers:
1325-                buckets.update(tracker.buckets)
1326-                for bucket in tracker.buckets:
1327-                    servermap.setdefault(bucket, set()).add(tracker.get_serverid())
1328-            encoder.set_shareholders(buckets, servermap)
1329+            for server in upload_servers:
1330+                buckets.update(server.buckets)
1331+
1332+            encoder.set_shareholders(buckets, graph)
1333             d = encoder.start()
1334             return d
1335         d.addCallback(_have_shareholders)
1336hunk ./src/allmydata/test/test_upload.py 906
1337             self.failUnlessEqual(data["count-shares-expected"], 12)
1338         d.addCallback(_check)
1339         return d
1340+    test_configure_parameters.timeout = 10
1341 
1342 
1343     def _setUp(self, ns):
1344hunk ./src/allmydata/test/test_upload.py 971
1345         d.addCallback(lambda ign:
1346             self.failUnless(self._has_happy_share_distribution()))
1347         return d
1348+    test_aborted_shares.timeout = 20
1349 
1350 
1351     def test_problem_layout_comment_52(self):
1352hunk ./src/allmydata/test/test_upload.py 1210
1353         d.addCallback(lambda ign:
1354             self.failUnless(self._has_happy_share_distribution()))
1355         return d
1356-    test_problem_layout_ticket_1124.todo = "Fix this after 1.7.1 release."
1357 
1358     def test_happiness_with_some_readonly_servers(self):
1359         # Try the following layout
1360hunk ./src/allmydata/test/test_upload.py 1330
1361         d.addCallback(_remove_server)
1362         d.addCallback(lambda ign:
1363             self.shouldFail(UploadUnhappinessError,
1364-                            "test_dropped_servers_in_encoder",
1365+                            "test_dropped_servers_in_encoder_1",
1366                             "shares could be placed on only 3 server(s) "
1367                             "such that any 3 of them have enough shares to "
1368                             "recover the file, but we were asked to place "
1369hunk ./src/allmydata/test/test_upload.py 1359
1370         d.addCallback(_remove_server)
1371         d.addCallback(lambda ign:
1372             self.shouldFail(UploadUnhappinessError,
1373-                            "test_dropped_servers_in_encoder",
1374+                            "test_dropped_servers_in_encoder_2",
1375                             "shares could be placed on only 3 server(s) "
1376                             "such that any 3 of them have enough shares to "
1377                             "recover the file, but we were asked to place "
1378hunk ./src/allmydata/test/test_upload.py 1367
1379                             self._do_upload_with_broken_servers, 2))
1380         return d
1381 
1382-
1383-    def test_merge_servers(self):
1384-        # merge_servers merges a list of upload_servers and a dict of
1385-        # shareid -> serverid mappings.
1386-        shares = {
1387-                    1 : set(["server1"]),
1388-                    2 : set(["server2"]),
1389-                    3 : set(["server3"]),
1390-                    4 : set(["server4", "server5"]),
1391-                    5 : set(["server1", "server2"]),
1392-                 }
1393-        # if not provided with a upload_servers argument, it should just
1394-        # return the first argument unchanged.
1395-        self.failUnlessEqual(shares, merge_servers(shares, set([])))
1396-        trackers = []
1397-        for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1398-            t = FakeServerTracker(server, [i])
1399-            trackers.append(t)
1400-        expected = {
1401-                    1 : set(["server1"]),
1402-                    2 : set(["server2"]),
1403-                    3 : set(["server3"]),
1404-                    4 : set(["server4", "server5"]),
1405-                    5 : set(["server1", "server2", "server5"]),
1406-                    6 : set(["server6"]),
1407-                    7 : set(["server7"]),
1408-                    8 : set(["server8"]),
1409-                   }
1410-        self.failUnlessEqual(expected, merge_servers(shares, set(trackers)))
1411-        shares2 = {}
1412-        expected = {
1413-                    5 : set(["server5"]),
1414-                    6 : set(["server6"]),
1415-                    7 : set(["server7"]),
1416-                    8 : set(["server8"]),
1417-                   }
1418-        self.failUnlessEqual(expected, merge_servers(shares2, set(trackers)))
1419-        shares3 = {}
1420-        trackers = []
1421-        expected = {}
1422-        for (i, server) in [(i, "server%d" % i) for i in xrange(10)]:
1423-            shares3[i] = set([server])
1424-            t = FakeServerTracker(server, [i])
1425-            trackers.append(t)
1426-            expected[i] = set([server])
1427-        self.failUnlessEqual(expected, merge_servers(shares3, set(trackers)))
1428-
1429-
1430-    def test_servers_of_happiness_utility_function(self):
1431-        # These tests are concerned with the servers_of_happiness()
1432-        # utility function, and its underlying matching algorithm. Other
1433-        # aspects of the servers_of_happiness behavior are tested
1434-        # elsehwere These tests exist to ensure that
1435-        # servers_of_happiness doesn't under or overcount the happiness
1436-        # value for given inputs.
1437-
1438-        # servers_of_happiness expects a dict of
1439-        # shnum => set(serverids) as a preexisting shares argument.
1440-        test1 = {
1441-                 1 : set(["server1"]),
1442-                 2 : set(["server2"]),
1443-                 3 : set(["server3"]),
1444-                 4 : set(["server4"])
1445-                }
1446-        happy = servers_of_happiness(test1)
1447-        self.failUnlessEqual(4, happy)
1448-        test1[4] = set(["server1"])
1449-        # We've added a duplicate server, so now servers_of_happiness
1450-        # should be 3 instead of 4.
1451-        happy = servers_of_happiness(test1)
1452-        self.failUnlessEqual(3, happy)
1453-        # The second argument of merge_servers should be a set of objects with
1454-        # serverid and buckets as attributes. In actual use, these will be
1455-        # ServerTracker instances, but for testing it is fine to make a
1456-        # FakeServerTracker whose job is to hold those instance variables to
1457-        # test that part.
1458-        trackers = []
1459-        for (i, server) in [(i, "server%d" % i) for i in xrange(5, 9)]:
1460-            t = FakeServerTracker(server, [i])
1461-            trackers.append(t)
1462-        # Recall that test1 is a server layout with servers_of_happiness
1463-        # = 3.  Since there isn't any overlap between the shnum ->
1464-        # set([serverid]) correspondences in test1 and those in trackers,
1465-        # the result here should be 7.
1466-        test2 = merge_servers(test1, set(trackers))
1467-        happy = servers_of_happiness(test2)
1468-        self.failUnlessEqual(7, happy)
1469-        # Now add an overlapping server to trackers. This is redundant,
1470-        # so it should not cause the previously reported happiness value
1471-        # to change.
1472-        t = FakeServerTracker("server1", [1])
1473-        trackers.append(t)
1474-        test2 = merge_servers(test1, set(trackers))
1475-        happy = servers_of_happiness(test2)
1476-        self.failUnlessEqual(7, happy)
1477-        test = {}
1478-        happy = servers_of_happiness(test)
1479-        self.failUnlessEqual(0, happy)
1480-        # Test a more substantial overlap between the trackers and the
1481-        # existing assignments.
1482-        test = {
1483-            1 : set(['server1']),
1484-            2 : set(['server2']),
1485-            3 : set(['server3']),
1486-            4 : set(['server4']),
1487-        }
1488-        trackers = []
1489-        t = FakeServerTracker('server5', [4])
1490-        trackers.append(t)
1491-        t = FakeServerTracker('server6', [3, 5])
1492-        trackers.append(t)
1493-        # The value returned by servers_of_happiness is the size
1494-        # of a maximum matching in the bipartite graph that
1495-        # servers_of_happiness() makes between serverids and share
1496-        # numbers. It should find something like this:
1497-        # (server 1, share 1)
1498-        # (server 2, share 2)
1499-        # (server 3, share 3)
1500-        # (server 5, share 4)
1501-        # (server 6, share 5)
1502-        #
1503-        # and, since there are 5 edges in this matching, it should
1504-        # return 5.
1505-        test2 = merge_servers(test, set(trackers))
1506-        happy = servers_of_happiness(test2)
1507-        self.failUnlessEqual(5, happy)
1508-        # Zooko's first puzzle:
1509-        # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
1510-        #
1511-        # server 1: shares 0, 1
1512-        # server 2: shares 1, 2
1513-        # server 3: share 2
1514-        #
1515-        # This should yield happiness of 3.
1516-        test = {
1517-            0 : set(['server1']),
1518-            1 : set(['server1', 'server2']),
1519-            2 : set(['server2', 'server3']),
1520-        }
1521-        self.failUnlessEqual(3, servers_of_happiness(test))
1522-        # Zooko's second puzzle:
1523-        # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
1524-        #
1525-        # server 1: shares 0, 1
1526-        # server 2: share 1
1527-        #
1528-        # This should yield happiness of 2.
1529-        test = {
1530-            0 : set(['server1']),
1531-            1 : set(['server1', 'server2']),
1532-        }
1533-        self.failUnlessEqual(2, servers_of_happiness(test))
1534-
1535-
1536-    def test_shares_by_server(self):
1537-        test = dict([(i, set(["server%d" % i])) for i in xrange(1, 5)])
1538-        sbs = shares_by_server(test)
1539-        self.failUnlessEqual(set([1]), sbs["server1"])
1540-        self.failUnlessEqual(set([2]), sbs["server2"])
1541-        self.failUnlessEqual(set([3]), sbs["server3"])
1542-        self.failUnlessEqual(set([4]), sbs["server4"])
1543-        test1 = {
1544-                    1 : set(["server1"]),
1545-                    2 : set(["server1"]),
1546-                    3 : set(["server1"]),
1547-                    4 : set(["server2"]),
1548-                    5 : set(["server2"])
1549-                }
1550-        sbs = shares_by_server(test1)
1551-        self.failUnlessEqual(set([1, 2, 3]), sbs["server1"])
1552-        self.failUnlessEqual(set([4, 5]), sbs["server2"])
1553-        # This should fail unless the serverid part of the mapping is a set
1554-        test2 = {1: "server1"}
1555-        self.shouldFail(AssertionError,
1556-                       "test_shares_by_server",
1557-                       "",
1558-                       shares_by_server, test2)
1559-
1560-
1561     def test_existing_share_detection(self):
1562         self.basedir = self.mktemp()
1563         d = self._setup_and_upload()
1564hunk ./src/allmydata/test/test_upload.py 1403
1565         d.addCallback(lambda ign:
1566             self.failUnless(self._has_happy_share_distribution()))
1567         return d
1568+    test_existing_share_detection.timeout = 20
1569 
1570 
1571     def test_query_counting(self):
1572hunk ./src/allmydata/test/test_upload.py 1407
1573-        # If server selection fails, Tahoe2ServerSelector prints out a lot
1574+        # If server selection fails, Tahoe2PeerSelector prints out a lot
1575         # of helpful diagnostic information, including query stats.
1576         # This test helps make sure that that information is accurate.
1577         self.basedir = self.mktemp()
1578hunk ./src/allmydata/test/test_upload.py 1426
1579         d.addCallback(_setup)
1580         d.addCallback(lambda c:
1581             self.shouldFail(UploadUnhappinessError, "test_query_counting",
1582-                            "10 queries placed some shares",
1583+                            "0 queries placed some shares",
1584                             c.upload, upload.Data("data" * 10000,
1585                                                   convergence="")))
1586         # Now try with some readonly servers. We want to make sure that
1587hunk ./src/allmydata/test/test_upload.py 1501
1588         d.addCallback(lambda client:
1589             self.shouldFail(UploadUnhappinessError,
1590                             "test_upper_limit_on_readonly_queries",
1591-                            "sent 8 queries to 8 servers",
1592+                            "sent 10 queries to 10 servers",
1593                             client.upload,
1594                             upload.Data('data' * 10000, convergence="")))
1595         return d
1596hunk ./src/allmydata/test/test_upload.py 1535
1597             return client
1598         d.addCallback(_reset_encoding_parameters)
1599         d.addCallback(lambda client:
1600-            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1601+            self.shouldFail(UploadUnhappinessError,
1602+                            "test_selection_exceptions_1",
1603                             "placed 0 shares out of 10 "
1604                             "total (10 homeless), want to place shares on at "
1605                             "least 4 servers such that any 3 of them have "
1606hunk ./src/allmydata/test/test_upload.py 1577
1607             self.g.remove_server(self.g.servers_by_number[0].my_nodeid))
1608         d.addCallback(_reset_encoding_parameters)
1609         d.addCallback(lambda client:
1610-            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1611+            self.shouldFail(UploadUnhappinessError,
1612+                            "test_selection_exceptions_2",
1613                             "placed 0 shares out of 10 "
1614                             "total (10 homeless), want to place shares on at "
1615                             "least 4 servers such that any 3 of them have "
1616hunk ./src/allmydata/test/test_upload.py 1583
1617                             "enough shares to recover the file, "
1618-                            "sent 5 queries to 5 servers, 0 queries placed "
1619-                            "some shares, 5 placed none "
1620+                            "sent 4 queries to 4 servers, 0 queries placed "
1621+                            "some shares, 4 placed none "
1622                             "(of which 4 placed none due to the server being "
1623hunk ./src/allmydata/test/test_upload.py 1586
1624-                            "full and 1 placed none due to an error)",
1625+                            "full and 0 placed none due to an error)",
1626                             client.upload,
1627                             upload.Data("data" * 10000, convergence="")))
1628         # server 0, server 1 = empty, accepting shares
1629hunk ./src/allmydata/test/test_upload.py 1599
1630             self._add_server(server_number=1))
1631         d.addCallback(_reset_encoding_parameters)
1632         d.addCallback(lambda client:
1633-            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1634+            self.shouldFail(UploadUnhappinessError,
1635+                            "test_selection_exceptions_3",
1636                             "shares could be placed or found on only 2 "
1637                             "server(s). We were asked to place shares on at "
1638                             "least 4 server(s) such that any 3 of them have "
1639hunk ./src/allmydata/test/test_upload.py 1625
1640             self._add_server(server_number=4))
1641         d.addCallback(_reset_encoding_parameters, happy=7)
1642         d.addCallback(lambda client:
1643-            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1644+            self.shouldFail(UploadUnhappinessError,
1645+                            "test_selection_exceptions_4",
1646                             "shares could be placed on only 5 server(s) such "
1647                             "that any 3 of them have enough shares to recover "
1648                             "the file, but we were asked to place shares on "
1649hunk ./src/allmydata/test/test_upload.py 1655
1650                                         readonly=True))
1651         d.addCallback(_reset_encoding_parameters, happy=7)
1652         d.addCallback(lambda client:
1653-            self.shouldFail(UploadUnhappinessError, "test_selection_exceptions",
1654+            self.shouldFail(UploadUnhappinessError,
1655+                            "test_selection_exceptions_5",
1656                             "shares could be placed or found on 4 server(s), "
1657                             "but they are not spread out evenly enough to "
1658                             "ensure that any 3 of these servers would have "
1659hunk ./src/allmydata/test/test_upload.py 1701
1660         d.addCallback(lambda ign:
1661             self.failUnless(self._has_happy_share_distribution()))
1662         return d
1663-    test_problem_layout_comment_187.todo = "this isn't fixed yet"
1664 
1665     def test_problem_layout_ticket_1118(self):
1666         # #1118 includes a report from a user who hit an assertion in
1667hunk ./src/allmydata/test/test_upload.py 1730
1668             return client
1669 
1670         d.addCallback(_setup)
1671-        # Note: actually it should succeed! See
1672-        # test_problem_layout_ticket_1128. But ticket 1118 is just to
1673-        # make it realize that it has failed, so if it raises
1674-        # UploadUnhappinessError then we'll give it the green light
1675-        # for now.
1676         d.addCallback(lambda ignored:
1677hunk ./src/allmydata/test/test_upload.py 1731
1678-            self.shouldFail(UploadUnhappinessError,
1679-                            "test_problem_layout_ticket_1118",
1680-                            "",
1681-                            self.g.clients[0].upload, upload.Data("data" * 10000,
1682-                                                       convergence="")))
1683+            self.g.clients[0].upload(upload.Data("data" * 1000,
1684+                                                 convergence="")))
1685         return d
1686 
1687     def test_problem_layout_ticket_1128(self):
1688hunk ./src/allmydata/test/test_upload.py 1768
1689         d.addCallback(lambda ign:
1690             self.failUnless(self._has_happy_share_distribution()))
1691         return d
1692-    test_problem_layout_ticket_1128.todo = "Invent a smarter uploader that uploads successfully in this case."
1693 
1694     def test_upload_succeeds_with_some_homeless_shares(self):
1695         # If the upload is forced to stop trying to place shares before
1696hunk ./src/allmydata/test/test_web.py 4276
1697                               " overdue= unused= need 3. Last failure: None")
1698             msg2 = msgbase + (" ran out of shares:"
1699                               " complete="
1700-                              " pending=Share(sh0-on-xgru5)"
1701+                              " pending=Share(sh0-on-ysbz4st7)"
1702                               " overdue= unused= need 3. Last failure: None")
1703             self.failUnless(body == msg1 or body == msg2, body)
1704         d.addCallback(_check_one_share)
1705}
1706[util/happinessutil.py: Abstract happiness functions behind an IPeerSelector-conforming object
1707Kevan Carstensen <kevan@isnotajoke.com>**20110503034926
1708 Ignore-this: ad14c5044baf64068faf53862107c3
1709] {
1710hunk ./src/allmydata/util/happinessutil.py 5
1711 I contain utilities useful for calculating servers_of_happiness, and for
1712 reporting it in messages
1713 """
1714+from allmydata.util import mathutil
1715+from allmydata.interfaces import IPeerSelector
1716+from zope.interface import implements
1717 
1718 from copy import deepcopy
1719 
1720hunk ./src/allmydata/util/happinessutil.py 46
1721     return msg
1722 
1723 
1724-def shares_by_server(servermap):
1725+class HappinessGraph:
1726+    implements(IPeerSelector)
1727     """
1728hunk ./src/allmydata/util/happinessutil.py 49
1729-    I accept a dict of shareid -> set(peerid) mappings, and return a
1730-    dict of peerid -> set(shareid) mappings. My argument is a dictionary
1731-    with sets of peers, indexed by shares, and I transform that into a
1732-    dictionary of sets of shares, indexed by peerids.
1733+    I am a bipartite graph that the servers of happiness code uses to
1734+    determine whether or not an upload is correctly distributed.
1735     """
1736hunk ./src/allmydata/util/happinessutil.py 52
1737-    ret = {}
1738-    for shareid, peers in servermap.iteritems():
1739-        assert isinstance(peers, set)
1740-        for peerid in peers:
1741-            ret.setdefault(peerid, set()).add(shareid)
1742-    return ret
1743+    def __init__(self, k, happy, n, logparent=None):
1744+        # We should avoid needlessly recomputing the happiness matching.
1745+        # If no changes to the graph have occurred between calls to
1746+        # compute_happiness, return _computed_happiness.
1747+        self._needs_recomputation = True
1748+        self._computed_happiness = 0
1749 
1750hunk ./src/allmydata/util/happinessutil.py 59
1751-def merge_servers(servermap, upload_trackers=None):
1752-    """
1753-    I accept a dict of shareid -> set(serverid) mappings, and optionally a
1754-    set of ServerTrackers. If no set of ServerTrackers is provided, I return
1755-    my first argument unmodified. Otherwise, I update a copy of my first
1756-    argument to include the shareid -> serverid mappings implied in the
1757-    set of ServerTrackers, returning the resulting dict.
1758-    """
1759-    # Since we mutate servermap, and are called outside of a
1760-    # context where it is okay to do that, make a copy of servermap and
1761-    # work with it.
1762-    servermap = deepcopy(servermap)
1763-    if not upload_trackers:
1764-        return servermap
1765+        # set((serverid, shnum))
1766+        # Used to keep track of relationships that we already know
1767+        # about.
1768+        self._existing_edges = set()
1769+        self._allocated_edges = set()
1770+        self._edges_used = set()
1771+        # The subset of allocated edges that was not used in my last
1772+        # returned share assignments.
1773+        self._obsolete_edges = set()
1774 
1775hunk ./src/allmydata/util/happinessutil.py 69
1776-    assert(isinstance(servermap, dict))
1777-    assert(isinstance(upload_trackers, set))
1778+        # serverid => set(shares)
1779+        # Returned to our caller when they ask for share assignments so
1780+        # that the graph computation doesn't have to take place twice.
1781+        self._assignment = {}
1782 
1783hunk ./src/allmydata/util/happinessutil.py 74
1784-    for tracker in upload_trackers:
1785-        for shnum in tracker.buckets:
1786-            servermap.setdefault(shnum, set()).add(tracker.get_serverid())
1787-    return servermap
1788+        # set([serverid])
1789+        # The set of peers that we can assign shares to.
1790+        self._writable_servers = []
1791 
1792hunk ./src/allmydata/util/happinessutil.py 78
1793-def servers_of_happiness(sharemap):
1794-    """
1795-    I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
1796-    return the 'servers_of_happiness' number that sharemap results in.
1797+        self._needed_shares = k
1798+        self._total_shares = n
1799+        self._happy = happy
1800 
1801hunk ./src/allmydata/util/happinessutil.py 82
1802-    To calculate the 'servers_of_happiness' number for the sharemap, I
1803-    construct a bipartite graph with servers in one partition of vertices
1804-    and shares in the other, and with an edge between a server s and a share t
1805-    if s is to store t. I then compute the size of a maximum matching in
1806-    the resulting graph; this is then returned as the 'servers_of_happiness'
1807-    for my arguments.
1808+        # TODO: Add the logparent.
1809 
1810hunk ./src/allmydata/util/happinessutil.py 84
1811-    For example, consider the following layout:
1812 
1813hunk ./src/allmydata/util/happinessutil.py 85
1814-      server 1: shares 1, 2, 3, 4
1815-      server 2: share 6
1816-      server 3: share 3
1817-      server 4: share 4
1818-      server 5: share 2
1819+    def add_servers(self, servers):
1820+        """
1821+        I incorporate the serverids in 'servers' into my internal
1822+        representation in such a way as to allow assignment of shares to
1823+        them later.
1824+        """
1825+        assert isinstance(servers, list)
1826 
1827hunk ./src/allmydata/util/happinessutil.py 93
1828-    From this, we can construct the following graph:
1829+        for server in servers:
1830+            if server not in self._writable_servers:
1831+                self._writable_servers.append(server)
1832+                self._needs_recomputation = True
1833 
1834hunk ./src/allmydata/util/happinessutil.py 98
1835-      L = {server 1, server 2, server 3, server 4, server 5}
1836-      R = {share 1, share 2, share 3, share 4, share 6}
1837-      V = L U R
1838-      E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
1839-           (server 1, share 4), (server 2, share 6), (server 3, share 3),
1840-           (server 4, share 4), (server 5, share 2)}
1841-      G = (V, E)
1842 
1843hunk ./src/allmydata/util/happinessutil.py 99
1844-    Note that G is bipartite since every edge in e has one endpoint in L
1845-    and one endpoint in R.
1846+    def confirm_share_allocation(self, server, share):
1847+        """
1848+        Establish the allocation of share to server.
1849 
1850hunk ./src/allmydata/util/happinessutil.py 103
1851-    A matching in a graph G is a subset M of E such that, for any vertex
1852-    v in V, v is incident to at most one edge of M. A maximum matching
1853-    in G is a matching that is no smaller than any other matching. For
1854-    this graph, a matching of cardinality 5 is:
1855+        This records that allocation in my internal world representation
1856+        and supersedes any existing allocations that may have already
1857+        been made.
1858+        """
1859+        assert (server, share) in self._edges_used
1860 
1861hunk ./src/allmydata/util/happinessutil.py 109
1862-      M = {(server 1, share 1), (server 2, share 6),
1863-           (server 3, share 3), (server 4, share 4),
1864-           (server 5, share 2)}
1865+        # check for existing edges, removing them if necessary.
1866+        filtered_edges = filter(lambda x: x[1] == share,
1867+                                self._allocated_edges)
1868 
1869hunk ./src/allmydata/util/happinessutil.py 113
1870-    Since G is bipartite, and since |L| = 5, we cannot have an M' such
1871-    that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
1872-    as long as k <= 5, we can see that the layout above has
1873-    servers_of_happiness = 5, which matches the results here.
1874-    """
1875-    if sharemap == {}:
1876-        return 0
1877-    sharemap = shares_by_server(sharemap)
1878-    graph = flow_network_for(sharemap)
1879-    # This is an implementation of the Ford-Fulkerson method for finding
1880-    # a maximum flow in a flow network applied to a bipartite graph.
1881-    # Specifically, it is the Edmonds-Karp algorithm, since it uses a
1882-    # BFS to find the shortest augmenting path at each iteration, if one
1883-    # exists.
1884-    #
1885-    # The implementation here is an adapation of an algorithm described in
1886-    # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
1887-    dim = len(graph)
1888-    flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
1889-    residual_graph, residual_function = residual_network(graph, flow_function)
1890-    while augmenting_path_for(residual_graph):
1891-        path = augmenting_path_for(residual_graph)
1892-        # Delta is the largest amount that we can increase flow across
1893-        # all of the edges in path. Because of the way that the residual
1894-        # function is constructed, f[u][v] for a particular edge (u, v)
1895-        # is the amount of unused capacity on that edge. Taking the
1896-        # minimum of a list of those values for each edge in the
1897-        # augmenting path gives us our delta.
1898-        delta = min(map(lambda (u, v): residual_function[u][v], path))
1899-        for (u, v) in path:
1900-            flow_function[u][v] += delta
1901-            flow_function[v][u] -= delta
1902-        residual_graph, residual_function = residual_network(graph,
1903-                                                             flow_function)
1904-    num_servers = len(sharemap)
1905-    # The value of a flow is the total flow out of the source vertex
1906-    # (vertex 0, in our graph). We could just as well sum across all of
1907-    # f[0], but we know that vertex 0 only has edges to the servers in
1908-    # our graph, so we can stop after summing flow across those. The
1909-    # value of a flow computed in this way is the size of a maximum
1910-    # matching on the bipartite graph described above.
1911-    return sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
1912+        # Otherwise, we're asking the uploader to do more work than it
1913+        # has to, since we're telling it to assign shares to something
1914+        # it already assigned shares to.
1915+        for f in filtered_edges:
1916+            assert f not in self._edges_used
1917 
1918hunk ./src/allmydata/util/happinessutil.py 119
1919-def flow_network_for(sharemap):
1920-    """
1921-    I take my argument, a dict of peerid -> set(shareid) mappings, and
1922-    turn it into a flow network suitable for use with Edmonds-Karp. I
1923-    then return the adjacency list representation of that network.
1924+        self._allocated_edges.difference_update(filtered_edges)
1925 
1926hunk ./src/allmydata/util/happinessutil.py 121
1927-    Specifically, I build G = (V, E), where:
1928-      V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
1929-      E = {(s, peerid) for each peerid}
1930-          U {(peerid, shareid) if peerid is to store shareid }
1931-          U {(shareid, t) for each shareid}
1932+        self._allocated_edges.add((server, share))
1933 
1934hunk ./src/allmydata/util/happinessutil.py 123
1935-    s and t will be source and sink nodes when my caller starts treating
1936-    the graph I return like a flow network. Without s and t, the
1937-    returned graph is bipartite.
1938-    """
1939-    # Servers don't have integral identifiers, and we can't make any
1940-    # assumptions about the way shares are indexed -- it's possible that
1941-    # there are missing shares, for example. So before making a graph,
1942-    # we re-index so that all of our vertices have integral indices, and
1943-    # that there aren't any holes. We start indexing at 1, so that we
1944-    # can add a source node at index 0.
1945-    sharemap, num_shares = reindex(sharemap, base_index=1)
1946-    num_servers = len(sharemap)
1947-    graph = [] # index -> [index], an adjacency list
1948-    # Add an entry at the top (index 0) that has an edge to every server
1949-    # in sharemap
1950-    graph.append(sharemap.keys())
1951-    # For each server, add an entry that has an edge to every share that it
1952-    # contains (or will contain).
1953-    for k in sharemap:
1954-        graph.append(sharemap[k])
1955-    # For each share, add an entry that has an edge to the sink.
1956-    sink_num = num_servers + num_shares + 1
1957-    for i in xrange(num_shares):
1958-        graph.append([sink_num])
1959-    # Add an empty entry for the sink, which has no outbound edges.
1960-    graph.append([])
1961-    return graph
1962+        # we don't need to recompute after this.
1963 
1964hunk ./src/allmydata/util/happinessutil.py 125
1965-def reindex(sharemap, base_index):
1966-    """
1967-    Given sharemap, I map peerids and shareids to integers that don't
1968-    conflict with each other, so they're useful as indices in a graph. I
1969-    return a sharemap that is reindexed appropriately, and also the
1970-    number of distinct shares in the resulting sharemap as a convenience
1971-    for my caller. base_index tells me where to start indexing.
1972-    """
1973-    shares  = {} # shareid  -> vertex index
1974-    num = base_index
1975-    ret = {} # peerid -> [shareid], a reindexed sharemap.
1976-    # Number the servers first
1977-    for k in sharemap:
1978-        ret[num] = sharemap[k]
1979-        num += 1
1980-    # Number the shares
1981-    for k in ret:
1982-        for shnum in ret[k]:
1983-            if not shares.has_key(shnum):
1984-                shares[shnum] = num
1985-                num += 1
1986-        ret[k] = map(lambda x: shares[x], ret[k])
1987-    return (ret, len(shares))
1988 
1989hunk ./src/allmydata/util/happinessutil.py 126
1990-def residual_network(graph, f):
1991-    """
1992-    I return the residual network and residual capacity function of the
1993-    flow network represented by my graph and f arguments. graph is a
1994-    flow network in adjacency-list form, and f is a flow in graph.
1995-    """
1996-    new_graph = [[] for i in xrange(len(graph))]
1997-    cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
1998-    for i in xrange(len(graph)):
1999-        for v in graph[i]:
2000-            if f[i][v] == 1:
2001-                # We add an edge (v, i) with cf[v,i] = 1. This means
2002-                # that we can remove 1 unit of flow from the edge (i, v)
2003-                new_graph[v].append(i)
2004-                cf[v][i] = 1
2005-                cf[i][v] = -1
2006-            else:
2007-                # We add the edge (i, v), since we're not using it right
2008-                # now.
2009-                new_graph[i].append(v)
2010-                cf[i][v] = 1
2011-                cf[v][i] = -1
2012-    return (new_graph, cf)
2013+    def add_server_with_share(self, server, share):
2014+        """
2015+        I record that the server identified by 'server' is currently
2016+        storing 'share'.
2017+        """
2018+        if (server, share) not in self._existing_edges:
2019+            self._existing_edges.add((server, share))
2020+            self._needs_recomputation = True
2021 
2022hunk ./src/allmydata/util/happinessutil.py 135
2023-def augmenting_path_for(graph):
2024-    """
2025-    I return an augmenting path, if there is one, from the source node
2026-    to the sink node in the flow network represented by my graph argument.
2027-    If there is no augmenting path, I return False. I assume that the
2028-    source node is at index 0 of graph, and the sink node is at the last
2029-    index. I also assume that graph is a flow network in adjacency list
2030-    form.
2031-    """
2032-    bfs_tree = bfs(graph, 0)
2033-    if bfs_tree[len(graph) - 1]:
2034-        n = len(graph) - 1
2035-        path = [] # [(u, v)], where u and v are vertices in the graph
2036-        while n != 0:
2037-            path.insert(0, (bfs_tree[n], n))
2038-            n = bfs_tree[n]
2039-        return path
2040-    return False
2041 
2042hunk ./src/allmydata/util/happinessutil.py 136
2043-def bfs(graph, s):
2044-    """
2045-    Perform a BFS on graph starting at s, where graph is a graph in
2046-    adjacency list form, and s is a node in graph. I return the
2047-    predecessor table that the BFS generates.
2048-    """
2049-    # This is an adaptation of the BFS described in "Introduction to
2050-    # Algorithms", Cormen et al, 2nd ed., p. 532.
2051-    # WHITE vertices are those that we haven't seen or explored yet.
2052-    WHITE = 0
2053-    # GRAY vertices are those we have seen, but haven't explored yet
2054-    GRAY  = 1
2055-    # BLACK vertices are those we have seen and explored
2056-    BLACK = 2
2057-    color        = [WHITE for i in xrange(len(graph))]
2058-    predecessor  = [None for i in xrange(len(graph))]
2059-    distance     = [-1 for i in xrange(len(graph))]
2060-    queue = [s] # vertices that we haven't explored yet.
2061-    color[s] = GRAY
2062-    distance[s] = 0
2063-    while queue:
2064-        n = queue.pop(0)
2065-        for v in graph[n]:
2066-            if color[v] == WHITE:
2067-                color[v] = GRAY
2068-                distance[v] = distance[n] + 1
2069-                predecessor[v] = n
2070-                queue.append(v)
2071-        color[n] = BLACK
2072-    return predecessor
2073+    def mark_bad_server(self, serverid):
2074+        """
2075+        Given a serverid, I remove that serverid from my internal world
2076+        representation. This means that I will not attempt to allocate
2077+        shares to it, nor will I remember any existing shares that I've
2078+        been told that it holds.
2079+        """
2080+        # remove serverid from writable peers, any unplaced share
2081+        # assignments.
2082+        self.mark_full_server(serverid)
2083+
2084+        # If serverid has any existing shares in our estimation, remove
2085+        # those as well.
2086+        existing = filter(lambda x: x[0] != serverid,
2087+                          list(self._existing_edges))
2088+        existing = set(existing)
2089+
2090+        if self._existing_edges != existing:
2091+            self._existing_edges = existing
2092+            self._needs_recomputation = True
2093+
2094+        allocated = filter(lambda x: x[0] != serverid,
2095+                           list(self._allocated_edges))
2096+        allocated = set(allocated)
2097+
2098+        if self._allocated_edges != allocated:
2099+            self._allocated_edges = allocated
2100+            self._needs_recomputation = True
2101+
2102+
2103+    def mark_full_server(self, serverid):
2104+        """
2105+        Given a serverid, I mark that serverid as full in my graph.
2106+        """
2107+        if serverid in self._writable_servers:
2108+            self._writable_servers.remove(serverid)
2109+
2110+        new_edges = filter(lambda x: x[0] != serverid, list(self._edges_used))
2111+        new_edges = set(new_edges)
2112+
2113+        if new_edges != self._edges_used:
2114+            self._edges_used = new_edges
2115+            self._needs_recomputation = True
2116+
2117+
2118+    def get_obsolete_allocations(self):
2119+        """
2120+        Return a sharemap-like structure of all of the allocations that
2121+        I've made and peers have followed that are obsolete. This allows
2122+        the uploader to cancel allocations as appropriate.
2123+        """
2124+        ret = {}
2125+        for (server, share) in self._obsolete_edges:
2126+            ret.setdefault(server, set()).add(share)
2127+
2128+        return ret
2129+
2130+
2131+    def _assign_homeless_shares(self, used_edges):
2132+        """
2133+        I'm a helper method to assign homeless shares.
2134+
2135+        The bipartite matching algorithm only computes the matching. By
2136+        definition, the matching cannot contain duplicate edges, so if
2137+        we're trying to place more shares than there are servers, we'll
2138+        have some left over. This method assigns those, trying to build
2139+        off of the existing matching to distribute them evenly.
2140+        """
2141+        assert isinstance(used_edges, set)
2142+
2143+        all_shares = set(range(self._total_shares))
2144+
2145+        comp_edges = used_edges.union(self._existing_edges)
2146+        comp_edges = comp_edges.union(self._allocated_edges)
2147+        for (peer, share) in comp_edges:
2148+            # If a share exists on more than one peer we may have already
2149+            # removed it by the time we get to this statement, which is why the
2150+            # check is necessary.
2151+            if share in all_shares: all_shares.remove(share)
2152+
2153+        # now all_shares is the set of all shares that we can't account
2154+        # for either as a result of our matching or as an existing share
2155+        # placement. these need to be distributed to peers.
2156+        peers_used = set([x[0] for x in used_edges])
2157+        # remove any peers that we can't assign shares to.
2158+        existing_servers = set([x[0] for x in self._existing_edges])
2159+        readonly_servers = \
2160+            existing_servers.difference(self._writable_servers)
2161+
2162+        peers_used.difference_update(readonly_servers)
2163+
2164+        new_edges = set()
2165+        while all_shares and peers_used:
2166+            new_edges.update(zip(peers_used, all_shares))
2167+            for share in [x[1] for x in new_edges]:
2168+                if share in all_shares: all_shares.remove(share)
2169+
2170+        used_edges.update(new_edges)
2171+
2172+        return used_edges
2173+
2174+
2175+    def _filter_edges(self, edges_to_filter):
2176+        """
2177+        Return a canonical set of existing edges to use.
2178+        """
2179+        our_sharemap = {}
2180+
2181+        for (peer, share) in edges_to_filter:
2182+            our_sharemap.setdefault(peer, set()).add(share)
2183+
2184+        edges_used = self._compute_matching(our_sharemap)
2185+
2186+        return edges_used
2187+
2188+
2189+    def _build_sharemap(self, shallow):
2190+        """
2191+        I'm a helper method responsible for synthesizing instance
2192+        information into a sharemap that can then be used by the
2193+        bipartite matching algorithm.
2194+        """
2195+        our_sharemap = {}
2196+
2197+        # Add the shares we already know about.
2198+        all_shares = set(range(self._total_shares))
2199+        peers_used_already = set()
2200+
2201+        # To obey our optimality guarantee, we need to consider readonly
2202+        # shares first, and separately from existing shares on peers we
2203+        # can write to.
2204+        readonly_edges = filter(lambda x: x[0] not in self._writable_servers,
2205+                                self._existing_edges)
2206+        writable_edges = filter(lambda x: x not in readonly_edges,
2207+                                self._existing_edges)
2208+
2209+        for edges in (readonly_edges, writable_edges, self._allocated_edges):
2210+            edges = filter(lambda x: x[0] not in peers_used_already, edges)
2211+            edges = filter(lambda x: x[1] in all_shares, edges)
2212+            edges_to_use = self._filter_edges(edges)
2213+
2214+            for (peer, share) in edges_to_use:
2215+                if peer not in peers_used_already:
2216+                    our_sharemap.setdefault(peer, set()).add(share)
2217+                    peers_used_already.add(peer)
2218+                    # this happens when a share exists on more than one peer.
2219+                    if share in all_shares: all_shares.remove(share)
2220+
2221+        # Pick happy peers that we know we can write to.
2222+        if not shallow:
2223+            peers_to_use = [x for x in self._writable_servers
2224+                            if x not in peers_used_already]
2225+            peers_to_use = peers_to_use[:len(all_shares)]
2226+
2227+            # Assign.
2228+            for peer in peers_to_use:
2229+                our_sharemap.setdefault(peer, set()).update(all_shares)
2230+
2231+
2232+        return our_sharemap
2233+
2234+
2235+    def _compute_happiness(self, shallow):
2236+        """
2237+        I compute the happiness of the current state of the graph that I
2238+        represent, if necessary, and return it.
2239+        """
2240+        # build the sharemap.
2241+        sharemap = self._build_sharemap(shallow)
2242+
2243+        # compute which edges we're going to use.
2244+        edges_used = self._compute_matching(sharemap)
2245+
2246+        all_edges = self._assign_homeless_shares(edges_used)
2247+
2248+        # From these, remove any edges that we already know about
2249+        all_edges = set(all_edges)
2250+        all_edges.difference_update(self._existing_edges)
2251+
2252+        unused_allocations = self._allocated_edges.difference(all_edges)
2253+        self._obsolete_edges.update(unused_allocations)
2254+
2255+        all_edges.difference_update(self._allocated_edges)
2256+
2257+        self._edges_used = all_edges
2258+
2259+
2260+    def get_share_assignments(self, shallow=False):
2261+        self._compute_happiness(shallow)
2262+        self._needs_recomputation = False
2263+
2264+        result = {}
2265+        for (peer, share) in self._edges_used:
2266+            result.setdefault(peer, set()).add(share)
2267+        return result
2268+
2269+
2270+    def set_trackers(self, trackers):
2271+        self._peer_trackers = trackers
2272+
2273+
2274+    def get_trackers(self):
2275+        return self._peer_trackers
2276+
2277+
2278+    def is_healthy(self):
2279+        """
2280+        I return whether the share assignments I last returned to the
2281+        caller represent a healthy share distribution according to the
2282+        server of happiness health metric.
2283+        """
2284+        return (self._computed_happiness >= self._happy)
2285+
2286+
2287+    def needs_recomputation(self):
2288+        """
2289+        I return True if the share assignments last returned to my
2290+        caller are possibly stale, and False otherwise.
2291+        """
2292+        return self._needs_recomputation
2293+
2294+
2295+    def get_homeless_shares(self):
2296+        """
2297+        I return a set of the shares that weren't placed in the last share
2298+        assignment.
2299+        """
2300+        all_shares = set(range(self._total_shares))
2301+
2302+        # Remove the shares that we know exist already.
2303+        existing_shares = set([x[1] for x in self._existing_edges])
2304+        all_shares.difference_update(existing_shares)
2305+
2306+        relevant_shares = set([x[1] for x in self._allocated_edges])
2307+        all_shares.difference_update(relevant_shares)
2308+
2309+        return all_shares
2310+
2311+
2312+    def get_failure_message(self):
2313+        """
2314+        I return a failure message suitable for printing in an exception
2315+        statement.
2316+        """
2317+        peers = [x[0] for x in self._edges_used]
2318+        peers.extend([x[0] for x in self._existing_edges])
2319+        peers = set(peers)
2320+
2321+        return failure_message(len(peers),
2322+                               self._needed_shares,
2323+                               self._happy,
2324+                               self._computed_happiness)
2325+
2326+
2327+    def _compute_matching(self, sharemap):
2328+        """
2329+        I accept 'sharemap', a dict of shareid -> set(peerid) mappings. I
2330+        return the 'servers_of_happiness' number that sharemap results in.
2331+
2332+        To calculate the 'servers_of_happiness' number for the sharemap, I
2333+        construct a bipartite graph with servers in one partition of vertices
2334+        and shares in the other, and with an edge between a server s and a share t
2335+        if s is to store t. I then compute the size of a maximum matching in
2336+        the resulting graph; this is then returned as the 'servers_of_happiness'
2337+        for my arguments.
2338+
2339+        For example, consider the following layout:
2340+
2341+          server 1: shares 1, 2, 3, 4
2342+          server 2: share 6
2343+          server 3: share 3
2344+          server 4: share 4
2345+          server 5: share 2
2346+
2347+        From this, we can construct the following graph:
2348+
2349+          L = {server 1, server 2, server 3, server 4, server 5}
2350+          R = {share 1, share 2, share 3, share 4, share 6}
2351+          V = L U R
2352+          E = {(server 1, share 1), (server 1, share 2), (server 1, share 3),
2353+               (server 1, share 4), (server 2, share 6), (server 3, share 3),
2354+               (server 4, share 4), (server 5, share 2)}
2355+          G = (V, E)
2356+
2357+        Note that G is bipartite since every edge in e has one endpoint in L
2358+        and one endpoint in R.
2359+
2360+        A matching in a graph G is a subset M of E such that, for any vertex
2361+        v in V, v is incident to at most one edge of M. A maximum matching
2362+        in G is a matching that is no smaller than any other matching. For
2363+        this graph, a matching of cardinality 5 is:
2364+
2365+          M = {(server 1, share 1), (server 2, share 6),
2366+               (server 3, share 3), (server 4, share 4),
2367+               (server 5, share 2)}
2368+
2369+        Since G is bipartite, and since |L| = 5, we cannot have an M' such
2370+        that |M'| > |M|. Then M is a maximum matching in G. Intuitively, and
2371+        as long as k <= 5, we can see that the layout above has
2372+        servers_of_happiness = 5, which matches the results here.
2373+        """
2374+        if sharemap == {}:
2375+            self._computed_happiness = 0
2376+            return set([])
2377+
2378+        # graph is what we're used to; vertices maps an index in the
2379+        # graph to its peerid or shnum.
2380+        # XXX: How do we tell the difference between them? Two dicts?
2381+        graph, peers, shares = self.flow_network_for(sharemap)
2382+        # This is an implementation of the Ford-Fulkerson method for finding
2383+        # a maximum flow in a flow network applied to a bipartite graph.
2384+        # Specifically, it is the Edmonds-Karp algorithm, since it uses a
2385+        # BFS to find the shortest augmenting path at each iteration, if one
2386+        # exists.
2387+        #
2388+        # The implementation here is an adapation of an algorithm described in
2389+        # "Introduction to Algorithms", Cormen et al, 2nd ed., pp 658-662.
2390+        dim = len(graph)
2391+        flow_function = [[0 for sh in xrange(dim)] for s in xrange(dim)]
2392+        residual_graph, residual_function = self.residual_network(graph,
2393+                                                                  flow_function)
2394+        while self.augmenting_path_for(residual_graph):
2395+            path = self.augmenting_path_for(residual_graph)
2396+            # Delta is the largest amount that we can increase flow across
2397+            # all of the edges in path. Because of the way that the residual
2398+            # function is constructed, f[u][v] for a particular edge (u, v)
2399+            # is the amount of unused capacity on that edge. Taking the
2400+            # minimum of a list of those values for each edge in the
2401+            # augmenting path gives us our delta.
2402+            delta = min(map(lambda (u, v): residual_function[u][v], path))
2403+            for (u, v) in path:
2404+                flow_function[u][v] += delta
2405+                flow_function[v][u] -= delta
2406+            residual_graph, \
2407+                residual_function = self.residual_network(graph, flow_function)
2408+        num_servers = len(sharemap)
2409+        # The value of a flow is the total flow out of the source vertex
2410+        # (vertex 0, in our graph). We could just as well sum across all of
2411+        # f[0], but we know that vertex 0 only has edges to the servers in
2412+        # our graph, so we can stop after summing flow across those. The
2413+        # value of a flow computed in this way is the size of a maximum
2414+        # matching on the bipartite graph described above.
2415+        happiness = sum([flow_function[0][v] for v in xrange(1, num_servers+1)])
2416+        self._computed_happiness = happiness
2417+
2418+        # Now we need to compute the edges that our matching used.
2419+        # Our maximum flow will place one unit of flow across each of
2420+        # the edges that end up getting used. By construction, we know
2421+        # that the first num_servers edges that aren't the source node
2422+        # are the servers, so we're going to look for those that have
2423+        # a unit of positive outflow.
2424+        #
2425+        # (we could just look for a negative flow to the source node,
2426+        #  but that wouldn't tell us anything about share assignments)
2427+        edges = set()
2428+
2429+        # Exclude the source node, include the server nodes, exclude the
2430+        # share nodes.
2431+        graph = flow_function[1:num_servers+1]
2432+        assert len(graph) == num_servers
2433+
2434+        for (i, x) in enumerate(graph):
2435+            # We're actually dealing with index i+1.
2436+            i += 1
2437+
2438+            if 1 in x:
2439+                # This node is getting used.
2440+                edges.add((i, x.index(1)))
2441+
2442+        assert len(edges) == self._computed_happiness
2443+
2444+        # Now transform the edges into actual edges.
2445+        edges = map(lambda (s, d): (peers[s], shares[d]), edges)
2446+        # The client can transform these into write buckets, if
2447+        # necessary.
2448+        return set(edges)
2449+
2450+
2451+    def flow_network_for(self, sharemap):
2452+        """
2453+        I take my argument, a dict of peerid -> set(shareid) mappings, and
2454+        turn it into a flow network suitable for use with Edmonds-Karp. I
2455+        then return the adjacency list representation of that network.
2456+
2457+        Specifically, I build G = (V, E), where:
2458+          V = { peerid in sharemap } U { shareid in sharemap } U {s, t}
2459+          E = {(s, peerid) for each peerid}
2460+              U {(peerid, shareid) if peerid is to store shareid }
2461+              U {(shareid, t) for each shareid}
2462+
2463+        s and t will be source and sink nodes when my caller starts treating
2464+        the graph I return like a flow network. Without s and t, the
2465+        returned graph is bipartite.
2466+        """
2467+        # Servers don't have integral identifiers, and we can't make any
2468+        # assumptions about the way shares are indexed -- it's possible that
2469+        # there are missing shares, for example. So before making a graph,
2470+        # we re-index so that all of our vertices have integral indices, and
2471+        # that there aren't any holes. We start indexing at 1, so that we
2472+        # can add a source node at index 0.
2473+        sharemap, shares, peers = self.reindex(sharemap, base_index=1)
2474+        num_servers = len(sharemap)
2475+        num_shares = len(shares)
2476+        graph = [] # index -> [index], an adjacency list
2477+        # Add an entry at the top (index 0) that has an edge to every server
2478+        # in sharemap
2479+        graph.append(sharemap.keys())
2480+        # For each server, add an entry that has an edge to every share that it
2481+        # contains (or will contain).
2482+        for k in sharemap:
2483+            graph.append(sharemap[k])
2484+        # For each share, add an entry that has an edge to the sink.
2485+        sink_num = num_servers + num_shares + 1
2486+        for i in xrange(num_shares):
2487+            graph.append([sink_num])
2488+        # Add an empty entry for the sink, which has no outbound edges.
2489+        graph.append([])
2490+        return graph, peers, shares
2491+
2492+    def reindex(self, sharemap, base_index):
2493+        """
2494+        Given sharemap, I map peerids and shareids to integers that don't
2495+        conflict with each other, so they're useful as indices in a graph. I
2496+        return a sharemap that is reindexed appropriately, and also the
2497+        number of distinct shares in the resulting sharemap as a convenience
2498+        for my caller. base_index tells me where to start indexing.
2499+        """
2500+        shares  = {} # shareid => vertex index, then reversed and
2501+                     # returned.
2502+        peers = {} # vertex index -> peerid
2503+        num = base_index
2504+        ret = {} # peerid -> [shareid], a reindexed sharemap.
2505+        # Number the servers first
2506+        for k in sharemap:
2507+            ret[num] = sharemap[k]
2508+            peers[num] = k
2509+            num += 1
2510+        # Number the shares
2511+        for k in ret:
2512+            for shnum in ret[k]:
2513+                if shnum not in shares:
2514+                    shares[shnum] = num
2515+                    num += 1
2516+            ret[k] = map(lambda x: shares[x], ret[k])
2517+
2518+        # Reverse the sharemap so that we can use it later.
2519+        shares = dict([(n, s) for (s, n) in shares.items()])
2520+        return (ret, shares, peers)
2521+
2522+    def residual_network(self, graph, f):
2523+        """
2524+        I return the residual network and residual capacity function of the
2525+        flow network represented by my graph and f arguments. graph is a
2526+        flow network in adjacency-list form, and f is a flow in graph.
2527+        """
2528+        new_graph = [[] for i in xrange(len(graph))]
2529+        cf = [[0 for s in xrange(len(graph))] for sh in xrange(len(graph))]
2530+        for i in xrange(len(graph)):
2531+            for v in graph[i]:
2532+                if f[i][v] == 1:
2533+                    # We add an edge (v, i) with cf[v,i] = 1. This means
2534+                    # that we can remove 1 unit of flow from the edge (i, v)
2535+                    new_graph[v].append(i)
2536+                    cf[v][i] = 1
2537+                    cf[i][v] = -1
2538+                else:
2539+                    # We add the edge (i, v), since we're not using it right
2540+                    # now.
2541+                    new_graph[i].append(v)
2542+                    cf[i][v] = 1
2543+                    cf[v][i] = -1
2544+        return (new_graph, cf)
2545+
2546+    def augmenting_path_for(self, graph):
2547+        """
2548+        I return an augmenting path, if there is one, from the source node
2549+        to the sink node in the flow network represented by my graph argument.
2550+        If there is no augmenting path, I return False. I assume that the
2551+        source node is at index 0 of graph, and the sink node is at the last
2552+        index. I also assume that graph is a flow network in adjacency list
2553+        form.
2554+        """
2555+        bfs_tree = self.bfs(graph, 0)
2556+        if bfs_tree[len(graph) - 1]:
2557+            n = len(graph) - 1
2558+            path = [] # [(u, v)], where u and v are vertices in the graph
2559+            while n != 0:
2560+                path.insert(0, (bfs_tree[n], n))
2561+                n = bfs_tree[n]
2562+            return path
2563+        return False
2564+
2565+    def bfs(self, graph, s):
2566+        """
2567+        Perform a BFS on graph starting at s, where graph is a graph in
2568+        adjacency list form, and s is a node in graph. I return the
2569+        predecessor table that the BFS generates.
2570+        """
2571+        # This is an adaptation of the BFS described in "Introduction to
2572+        # Algorithms", Cormen et al, 2nd ed., p. 532.
2573+        # WHITE vertices are those that we haven't seen or explored yet.
2574+        WHITE = 0
2575+        # GRAY vertices are those we have seen, but haven't explored yet
2576+        GRAY  = 1
2577+        # BLACK vertices are those we have seen and explored
2578+        BLACK = 2
2579+        color        = [WHITE for i in xrange(len(graph))]
2580+        predecessor  = [None for i in xrange(len(graph))]
2581+        distance     = [-1 for i in xrange(len(graph))]
2582+        queue = [s] # vertices that we haven't explored yet.
2583+        color[s] = GRAY
2584+        distance[s] = 0
2585+        while queue:
2586+            n = queue.pop(0)
2587+            for v in graph[n]:
2588+                if color[v] == WHITE:
2589+                    color[v] = GRAY
2590+                    distance[v] = distance[n] + 1
2591+                    predecessor[v] = n
2592+                    queue.append(v)
2593+            color[n] = BLACK
2594+        return predecessor
2595}
2596[test/test_upload: Add tests for HappinessGraph object.
2597Kevan Carstensen <kevan@isnotajoke.com>**20110503035009
2598 Ignore-this: b2a290bd0d5e2d71d83f01cee6c55c68
2599] {
2600hunk ./src/allmydata/test/test_upload.py 17
2601 from allmydata.util import log
2602 from allmydata.util.assertutil import precondition
2603 from allmydata.util.deferredutil import DeferredListShouldSucceed
2604+from allmydata.util.happinessutil import HappinessGraph
2605 from allmydata.test.no_network import GridTestMixin
2606 from allmydata.test.common_util import ShouldFailMixin
2607hunk ./src/allmydata/test/test_upload.py 20
2608-from allmydata.util.happinessutil import servers_of_happiness, \
2609-                                         shares_by_server, merge_servers
2610 from allmydata.storage_client import StorageFarmBroker
2611 from allmydata.storage.server import storage_index_to_dir
2612 
2613hunk ./src/allmydata/test/test_upload.py 450
2614         return d
2615 
2616     def test_second_error_all(self):
2617+        # XXX: This test is stupid and wrong; fix it.
2618         self.make_node("second-fail")
2619         self.set_encoding_parameters(k=25, happy=1, n=100)
2620         # The upload won't ask any peer to allocate shares more than once on
2621hunk ./src/allmydata/test/test_upload.py 1906
2622         f.close()
2623         return None
2624 
2625+class HappinessGraphTests(unittest.TestCase, ShouldFailMixin):
2626+    """
2627+    Tests for the HappinessGraph construct, which handles validation and
2628+    revaliation based on what we've seen on the grid.
2629+    """
2630+    def make_servers_to(self, n=0):
2631+        """
2632+        Make servers up to n
2633+        """
2634+        return ["server%d" % i  for i in xrange(n)]
2635+
2636+
2637+    def test_add_servers_with_excess_shares(self):
2638+        """
2639+        Test the add_servers method in HappinessGraph when there are more
2640+        shares than there are writable servers.
2641+        """
2642+        # First, test the case where there are fewer servers than shares.
2643+        # We should have happiness equal to the number of servers.
2644+        servers = self.make_servers_to(6)
2645+
2646+        self.g = HappinessGraph(3, 7, 10)
2647+        self.g.add_servers(servers)
2648+        self.g.get_share_assignments()
2649+
2650+        self.failUnlessEqual(self.g._computed_happiness, len(servers))
2651+        self.failIf(self.g.is_healthy())
2652+
2653+
2654+    def test_add_servers_with_excess_servers(self):
2655+        """
2656+        Test the add_servers method in HappinessGraph when there are more
2657+        writable servers than there are shares.
2658+        """
2659+        servers = self.make_servers_to(15)
2660+        self.g = HappinessGraph(3, 7, 10)
2661+        self.g.add_servers(servers)
2662+        self.g.get_share_assignments()
2663+
2664+        self.failUnlessEqual(self.g._computed_happiness, 10)
2665+        self.failUnless(self.g.is_healthy())
2666+
2667+
2668+    def test_add_server_with_share(self):
2669+        """
2670+        HappinessGraph.add_server_with_share adds a single edge to its
2671+        internal graph representation. It is used by client code that
2672+        has found a single existing share that it wishes to add to the
2673+        graph. This is in contrast to the idea of representing possible
2674+        share assignments by adding a complete graph; we don't want to
2675+        represent that with read-only servers that we don't know we can
2676+        assign to.
2677+        """
2678+        self.g = HappinessGraph(3, 8, 10)
2679+        added_count = 0
2680+        for (sv, sh) in zip(self.make_servers_to(7), range(7)):
2681+            self.g.add_server_with_share(sv, sh)
2682+            added_count += 1
2683+            self.g.get_share_assignments()
2684+
2685+            self.failIf(self.g.is_healthy())
2686+            self.failUnlessEqual(self.g._computed_happiness, added_count)
2687+
2688+        self.g.add_server_with_share("server8", 7)
2689+        self.g.get_share_assignments()
2690+        self.failUnless(self.g.is_healthy())
2691+        self.failUnlessEqual(self.g._computed_happiness, added_count + 1)
2692+
2693+
2694+    def test_add_server_with_share_duplicate_share(self):
2695+        """
2696+        HappinessGraph shouldn't overcount happiness in the case where a
2697+        share is found on more than one server.
2698+        """
2699+        self.g = HappinessGraph(3, 8, 10)
2700+
2701+        for server in self.make_servers_to(6):
2702+            self.g.add_server_with_share(server, 1)
2703+            self.g.get_share_assignments()
2704+
2705+            self.failIf(self.g.is_healthy())
2706+            self.failUnlessEqual(self.g._computed_happiness, 1)
2707+
2708+
2709+    def test_add_server_with_share_multiple_shares_on_server(self):
2710+        """
2711+        HappinessGraph shouldn't overcount happiness in the case where a
2712+        server has more than one share.
2713+        """
2714+        self.g = HappinessGraph(3, 8, 10)
2715+
2716+        for share in range(6):
2717+            self.g.add_server_with_share("server1", share)
2718+            self.g.get_share_assignments()
2719+
2720+            self.failIf(self.g.is_healthy())
2721+            self.failUnlessEqual(self.g._computed_happiness, 1)
2722+
2723+
2724+    def test_add_server_with_share_after_assignments(self):
2725+        """
2726+        add_server_with_share should interact in a useful way with the
2727+        existing share assignments after we've made them.
2728+        """
2729+        self.g = HappinessGraph(3, 5, 8)
2730+
2731+        servers = self.make_servers_to(6)
2732+        shares = [i for i in xrange(6)]
2733+
2734+        self.g.add_servers(servers)
2735+        self.g.get_share_assignments()
2736+        self.failUnless(self.g.is_healthy())
2737+        self.failUnlessEqual(self.g._computed_happiness, 6)
2738+
2739+        # Now add some edges. We'll add duplicate edges, which shouldn't
2740+        # increase happiness any.
2741+        edges = zip(servers, shares)
2742+        for (server, share) in edges:
2743+            self.g.add_server_with_share(server, share)
2744+            self.g.get_share_assignments()
2745+
2746+            self.failUnless(self.g.is_healthy())
2747+            self.failUnlessEqual(self.g._computed_happiness, 6)
2748+
2749+        # Now add some new edges, which should increase happiness.
2750+        self.g.add_server_with_share("server6", 6)
2751+        self.g.get_share_assignments()
2752+        self.failUnless(self.g.is_healthy())
2753+        self.failUnlessEqual(self.g._computed_happiness, 7)
2754+
2755+        self.g.add_server_with_share("server7", 7)
2756+        self.g.get_share_assignments()
2757+        self.failUnless(self.g.is_healthy())
2758+        self.failUnlessEqual(self.g._computed_happiness, 8)
2759+
2760+
2761+    def test_mark_full_server(self):
2762+        """
2763+        Marking a full server should mean that no new shares get assigned
2764+        to that server, but that existing shares added with
2765+        add_server_with_share are still counted.
2766+        """
2767+        self.g = HappinessGraph(3, 5, 6)
2768+
2769+        servers = self.make_servers_to(6)
2770+        shares = [i for i in xrange(6)]
2771+
2772+        self.g.add_servers(servers)
2773+        self.g.get_share_assignments()
2774+
2775+        self.failUnless(self.g.is_healthy())
2776+        self.failUnlessEqual(self.g._computed_happiness, 6)
2777+
2778+        # Now remove a couple of servers, and verify that the happiness
2779+        # score decreases accordingly.
2780+        self.g.mark_full_server("server5")
2781+        self.g.get_share_assignments()
2782+
2783+        self.failUnless(self.g.is_healthy())
2784+        self.failUnlessEqual(self.g._computed_happiness, 5)
2785+
2786+        self.g.mark_full_server("server4")
2787+        self.g.get_share_assignments()
2788+        self.failIf(self.g.is_healthy())
2789+        self.failUnlessEqual(self.g._computed_happiness, 4)
2790+
2791+
2792+    def test_mark_bad_server(self):
2793+        """
2794+        If we mark a server bad, we should see existing shares on that
2795+        server vanish.
2796+        """
2797+        self.g = HappinessGraph(3, 5, 6)
2798+
2799+        servers = self.make_servers_to(6)
2800+        self.g.add_servers(servers)
2801+
2802+        # Now make a couple additional servers and say that they have some shares.
2803+        self.g.add_server_with_share("server6", 4)
2804+        self.g.add_server_with_share("server7", 5)
2805+
2806+        # We have all of these shares in the graph already, so happiness
2807+        # should be 6.
2808+        self.g.get_share_assignments()
2809+        self.failUnless(self.g.is_healthy())
2810+        self.failUnlessEqual(self.g._computed_happiness, 6)
2811+
2812+        # Now mark one of those servers bad. Nothing should change.
2813+        self.g.mark_bad_server("server6")
2814+        self.g.get_share_assignments()
2815+        self.failUnless(self.g.is_healthy())
2816+        self.failUnlessEqual(self.g._computed_happiness, 6)
2817+
2818+        # Now mark one of the servers that we added initially bad.
2819+        self.g.mark_bad_server("server0")
2820+        self.g.mark_bad_server("server7")
2821+        assignments =  self.g.get_share_assignments()
2822+
2823+        for k in assignments:
2824+            self.failIfEqual(k, "server0")
2825+
2826+
2827+    def test_mark_nonexistent_server(self):
2828+        """
2829+        Removing a nonexistent server shouldn't do anything.
2830+        """
2831+        self.g = HappinessGraph(3, 5, 6)
2832+
2833+        servers = self.make_servers_to(6)
2834+        self.g.add_servers(servers)
2835+        self.g.get_share_assignments()
2836+        self.failUnless(self.g.is_healthy())
2837+        self.failUnlessEqual(self.g._computed_happiness, 6)
2838+
2839+        # Now try removing a server that isn't there; nothing should
2840+        # happen in this case.
2841+        self.g.mark_bad_server("server7")
2842+        self.g.get_share_assignments()
2843+        self.failUnlessEqual(self.g._computed_happiness, 6)
2844+
2845+
2846+    def test_compute_happiness(self):
2847+        # The happiness of an unused graph should be zero.
2848+        self.g = HappinessGraph(3, 5, 10)
2849+
2850+        self.g.get_share_assignments()
2851+        self.failIf(self.g.is_healthy())
2852+        self.failUnlessEqual(self.g._computed_happiness, 0)
2853+
2854+        self.g.add_server_with_share("server1", 1)
2855+        self.g.add_server_with_share("server2", 2)
2856+        self.g.add_server_with_share("server3", 3)
2857+        self.g.add_server_with_share("server4", 4)
2858+        self.g.get_share_assignments()
2859+
2860+        self.failUnlessEqual(self.g._computed_happiness, 4)
2861+
2862+        self.g.mark_bad_server("server4")
2863+        self.g.add_server_with_share("server1", 4)
2864+        self.g.get_share_assignments()
2865+        # We've added a duplicate server, so now servers_of_happiness
2866+        # should be 3 instead of 4.
2867+        self.failUnlessEqual(self.g._computed_happiness, 3)
2868+
2869+        # Let's add four more servers with no overlap between themselves
2870+        # or the (server,share) tuples already in the matching.
2871+        for (server, share) in [("server%d" % i, i) for i in xrange(5, 9)]:
2872+            self.g.add_server_with_share(server, share)
2873+
2874+        self.g.get_share_assignments()
2875+        self.failUnlessEqual(self.g._computed_happiness, 7)
2876+
2877+        # Try adding a redundant server and share; observe that it does
2878+        # not affect the matching.
2879+        self.g.add_server_with_share("server1", 1)
2880+        self.g.get_share_assignments()
2881+        self.failUnlessEqual(self.g._computed_happiness, 7)
2882+
2883+        # Test a more substantial overlap between the trackers and the
2884+        # existing assignments.
2885+        edges = [
2886+            ("server1", 1),
2887+            ("server2", 2),
2888+            ("server3", 3),
2889+            ("server4", 4),
2890+            ("server5", 4),
2891+            ("server6", 3),
2892+            ("server6", 5),
2893+        ]
2894+        self.g = HappinessGraph(3, 5, 6)
2895+        for (server, share) in edges: self.g.add_server_with_share(server, share)
2896+
2897+        # The value returned by servers_of_happiness is the size
2898+        # of a maximum matching in the bipartite graph that
2899+        # servers_of_happiness() makes between serverids and share
2900+        # numbers. It should find something like this:
2901+        # (server 1, share 1)
2902+        # (server 2, share 2)
2903+        # (server 3, share 3)
2904+        # (server 5, share 4)
2905+        # (server 6, share 5)
2906+        #
2907+        # and, since there are 5 edges in this matching, it should
2908+        # return 5.
2909+        self.g.get_share_assignments()
2910+        self.failUnlessEqual(self.g._computed_happiness, 5)
2911+
2912+        # Now remove server 6, and watch the happiness drop to 4.
2913+        self.g.mark_bad_server("server6")
2914+        self.g.get_share_assignments()
2915+        self.failUnlessEqual(self.g._computed_happiness, 4)
2916+
2917+        # Now remove server 5, and watch happiness stay where it is.
2918+        self.g.mark_bad_server("server4")
2919+        self.g.get_share_assignments()
2920+        self.failUnlessEqual(self.g._computed_happiness, 4)
2921+
2922+        # Zooko's first puzzle:
2923+        # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:156)
2924+        #
2925+        # server 1: shares 0, 1
2926+        # server 2: shares 1, 2
2927+        # server 3: share 2
2928+        #
2929+        # This should yield happiness of 3.
2930+        self.g = HappinessGraph(3, 3, 6)
2931+
2932+        edges = [
2933+            ("server1", 0),
2934+            ("server1", 1),
2935+            ("server2", 1),
2936+            ("server2", 2),
2937+            ("server3", 2)
2938+        ]
2939+        for (server, share) in edges: self.g.add_server_with_share(server, share)
2940+
2941+        self.g.get_share_assignments()
2942+        self.failUnlessEqual(3, self.g._computed_happiness)
2943+
2944+        # Zooko's second puzzle:
2945+        # (from http://allmydata.org/trac/tahoe-lafs/ticket/778#comment:158)
2946+        #
2947+        # server 1: shares 0, 1
2948+        # server 2: share 1
2949+        #
2950+        # This should yield happiness of 2.
2951+        self.g = HappinessGraph(3, 3, 6)
2952+
2953+        edges = [
2954+            ("server1", 0),
2955+            ("server1", 1),
2956+            ("server2", 1),
2957+        ]
2958+        for (server, share) in edges: self.g.add_server_with_share(server, share)
2959+
2960+        self.g.get_share_assignments()
2961+        self.failUnlessEqual(2, self.g._computed_happiness)
2962+
2963+
2964+    def test_shallow_compute_happiness(self):
2965+        """
2966+        Shallow happiness computations depend only on the edges that
2967+        correspond to already existing share placements. These tell us
2968+        whether we're likely to be able to continue an upload without
2969+        resulting in unhappiness after we've started it.
2970+        """
2971+        # Make more servers than shares. This means that we'll have one
2972+        # server with no shares assigned to it.
2973+        self.g = HappinessGraph(3, 5, 5)
2974+        servers = self.make_servers_to(6)
2975+        self.g.add_servers(servers)
2976+        results = self.g.get_share_assignments()
2977+
2978+        self.failUnlessEqual(self.g._computed_happiness, 5)
2979+
2980+        # Now we tell the graph that we managed to allocate space for
2981+        # all of these shares.
2982+        for k in results:
2983+            for s in results[k]:
2984+                self.g.add_server_with_share(k, s)
2985+
2986+        # Now imagine that our upload has started, and that one of the
2987+        # servers that we're working with is broken. Uh oh. Remove that
2988+        # server from the list.
2989+        s = [k for k in results][0]
2990+        self.g.mark_bad_server(s)
2991+
2992+        # Now recompute. At this point we just want the upload to die
2993+        # gracefully if it is unhappy, since it will be less work then
2994+        # recomputing the matching, reallocating buckets, and so on on
2995+        # the fly. Since we only know about 5 edges in our graph, and
2996+        # we've just removed one of them, the result should be four,
2997+        # even though we could theoretically go back, ask the remaining
2998+        # server to hold some share, and try again.
2999+        # (that's probably what we will do, though after starting the
3000+        #  upload over again)
3001+        self.g.get_share_assignments(shallow=True)
3002+        self.failUnlessEqual(self.g._computed_happiness, 4)
3003+
3004+
3005+    def test_get_share_assignments(self):
3006+        """
3007+        The HappinessGraph needs to be able to tell us about share
3008+        assignments correctly so that we know where to put things.
3009+        """
3010+        self.g = HappinessGraph(3, 5, 10)
3011+        servers = self.make_servers_to(6)
3012+        shares = [i for i in xrange(6)]
3013+        edges = zip(servers, shares)
3014+
3015+        for (server, share) in edges:
3016+            self.g.add_server_with_share(server, share)
3017+
3018+        assignments = self.g.get_share_assignments()
3019+        self.failUnlessEqual(self.g._computed_happiness, 6)
3020+        # All of these shares are existing, so we have no work to do.
3021+        self.failUnlessEqual(assignments, {})
3022+
3023+        more_servers = self.make_servers_to(10)
3024+        self.g.add_servers(more_servers)
3025+
3026+        assignments = self.g.get_share_assignments()
3027+
3028+        self.failUnlessEqual(self.g._computed_happiness, 10)
3029+
3030+        all_shares = range(10)
3031+
3032+        for server in assignments:
3033+            for share in assignments[server]:
3034+                self.failUnlessIn(server, more_servers)
3035+                self.failUnlessIn(share, all_shares)
3036+                more_servers.remove(server)
3037+                all_shares.remove(share)
3038+
3039+
3040+    def test_needs_recomputation(self):
3041+        """
3042+        HappinessGraph instances should be smart enough to know when
3043+        they need to recompute happiness and when they can return the
3044+        value that they've already computed.
3045+        """
3046+        self.g = HappinessGraph(3, 5, 10)
3047+        servers = self.make_servers_to(6)
3048+
3049+        self.g.add_servers(servers)
3050+        self.failUnless(self.g.needs_recomputation())
3051+
3052+        self.g.get_share_assignments()
3053+        self.failIf(self.g.needs_recomputation())
3054+
3055+        # Now add a server, and verify that it needs to be recomputed.
3056+        self.g.add_server_with_share("server7", 7)
3057+        self.failUnless(self.g.needs_recomputation())
3058+        self.g.get_share_assignments()
3059+        self.failIf(self.g.needs_recomputation())
3060+
3061+        # Since this server has an existing share, we shouldn't need to
3062+        # recompute the graph for it if it's full.
3063+        self.g.mark_full_server("server7")
3064+        self.failIf(self.g.needs_recomputation())
3065+
3066+        self.g.mark_bad_server("server7")
3067+        self.failUnless(self.g.needs_recomputation())
3068+        self.g.get_share_assignments()
3069+        self.failIf(self.g.needs_recomputation())
3070+
3071+        # Now remove a nonexistent server, and verify that recomputation
3072+        # isn't called for
3073+        self.g.mark_bad_server("serverfake")
3074+        self.failIf(self.g.needs_recomputation())
3075+
3076+
3077+    def test_assignment_freshness(self):
3078+        self.g = HappinessGraph(3, 5, 7)
3079+
3080+        servers = self.make_servers_to(6)
3081+        self.g.add_servers(servers)
3082+        self.g.get_share_assignments()
3083+
3084+        self.failUnlessEqual(self.g._computed_happiness, 6)
3085+
3086+        self.g.add_server_with_share("server7", 6)
3087+        self.g.get_share_assignments()
3088+        self.failUnlessEqual(self.g._computed_happiness, 7)
3089+
3090+
3091+    def test_assignment_server_permutation(self):
3092+        """
3093+        If we give the HappinessGraph instance an ordered list of servers
3094+        and there are more servers that it needs to make a happy share
3095+        assignment, it should distribute shares over the earlier servers
3096+        before distributing them to the later servers.
3097+        """
3098+        self.g = HappinessGraph(3, 7, 10)
3099+
3100+        servers = self.make_servers_to(15)
3101+        self.g.add_servers(servers)
3102+
3103+        assignments = self.g.get_share_assignments()
3104+        self.failUnless(self.g.is_healthy())
3105+        self.failUnlessEqual(self.g._computed_happiness, 10)
3106+
3107+        expected_use_servers = list(servers)[:10]
3108+        for k in assignments:
3109+            self.failUnlessIn(k, expected_use_servers)
3110+
3111+
3112+    def test_confirm_share_placement(self):
3113+        # The HappinessGraph won't start counting edges associated with
3114+        # allocated shares until we've confirmed that those have been
3115+        # placed.
3116+        self.g = HappinessGraph(3, 7, 10)
3117+        servers = self.make_servers_to(10)
3118+        self.g.add_servers(servers)
3119+
3120+        assignments = self.g.get_share_assignments()
3121+        self.failUnlessEqual(len(assignments), 10)
3122+        self.failUnless(self.g.is_healthy())
3123+
3124+        # Now do a shallow happiness computation. We shouldn't get any
3125+        # assignments back, since we haven't given the HappinessGraph
3126+        # any indication that we've successfully allocated anything.
3127+        assignments = self.g.get_share_assignments(shallow=True)
3128+        self.failUnlessEqual(len(assignments), 0)
3129+        self.failIf(self.g.is_healthy())
3130+
3131+        # Now confirm a few shares.
3132+        assignments = self.g.get_share_assignments()
3133+
3134+        for server in servers[:3]:
3135+            for share in assignments[server]:
3136+                self.g.confirm_share_allocation(server, share)
3137+
3138+
3139+        # and check that they show up in subsequent assignments.
3140+        assignments = self.g.get_share_assignments(shallow=True)
3141+        self.failUnlessEqual(len(assignments), 3)
3142+        self.failIf(self.g.is_healthy())
3143+
3144+
3145+    def test_obsolete_allocations_with_confirmed_placments(self):
3146+        # The HappinessGraph keeps track of all of the allocations it's
3147+        # made over the course of server selection. In some cases, these
3148+        # allocations can become obsolete -- for example, if information
3149+        # found when allocating shares reveals additional shares on
3150+        # additional servers such that fewer shares actually need to be
3151+        # allocated to adequately distribute a file. The
3152+        # get_obsolete_allocations method tells the uploader about these
3153+        # so it can cancel them appropriately.
3154+        self.g = HappinessGraph(3, 7, 10)
3155+
3156+        servers = self.make_servers_to(10)
3157+        self.g.add_servers(servers)
3158+
3159+        assignments = self.g.get_share_assignments()
3160+        self.failUnless(self.g.is_healthy())
3161+
3162+        # Now we'll confirm all of these placements.
3163+        for server in assignments:
3164+            for share in assignments[server]:
3165+                self.g.confirm_share_allocation(server, share)
3166+
3167+        # We'll also pretend that we've found share 1 on a server.
3168+        self.g.add_server_with_share("server1", 1)
3169+
3170+        # Now we'll attempt to get share assignments again.
3171+        assignments = self.g.get_share_assignments()
3172+        self.failUnlessEqual(self.g._computed_happiness, 10)
3173+
3174+        obsolete = self.g.get_obsolete_allocations()
3175+        self.failUnlessEqual(len(obsolete), 2)
3176+
3177+
3178+    def test_obsolete_allocations_with_unconfirmed_placements(self):
3179+        # The HappinessGraph should not care about shares which have
3180+        # been allocated but which have not been placed by an uploader.
3181+        # These should not be reported by get_obsolete_allocations.
3182+        self.g = HappinessGraph(3, 7, 10)
3183+
3184+        servers = self.make_servers_to(10)
3185+        self.g.add_servers(servers)
3186+
3187+        assignments = self.g.get_share_assignments()
3188+
3189+        # Don't confirm any of these placements, but find another share.
3190+        self.g.add_server_with_share("server1", 1)
3191+
3192+        # then recompute.
3193+        assignments = self.g.get_share_assignments()
3194+        self.failUnlessEqual(self.g._computed_happiness, 10)
3195+
3196+        obsolete = self.g.get_obsolete_allocations()
3197+        # We made no allocations, so there shouldn't be any obsolete
3198+        # allocations.
3199+        self.failUnlessEqual(len(obsolete), 0)
3200+
3201+
3202+    def test_allocated_reuse(self):
3203+        # If we have to redo share placement after already allocating
3204+        # some shares to some servers, we should make sure to re-use
3205+        # those allocations if possible in order to reduce the number of
3206+        # network round trips.
3207+        self.g = HappinessGraph(3, 7, 10)
3208+
3209+        servers = self.make_servers_to(10)
3210+        self.g.add_servers(servers)
3211+
3212+        assignments = self.g.get_share_assignments()
3213+
3214+        # Confirm all of the share assignments.
3215+        for server in assignments:
3216+            for share in assignments[server]:
3217+                self.g.confirm_share_allocation(server, share)
3218+
3219+        # We need to break the allocation somehow. We'll do that by
3220+        # finding a share on the server "server1" that wasn't assigned
3221+        # to it.
3222+        all_shares = set([i for i in xrange(10)])
3223+        all_shares.difference_update(assignments["server1"])
3224+        share = list(all_shares)[0]
3225+
3226+        self.g.add_server_with_share("server1", share)
3227+
3228+        # And go again.
3229+        assignments = self.g.get_share_assignments()
3230+
3231+        # We should have had to allocate one more share to deal with
3232+        # that.
3233+        self.failUnlessEqual(len(assignments), 1)
3234+        self.failUnlessEqual(self.g._computed_happiness, 10)
3235+
3236+        # And, if we check for obsolete allocations, we should have two
3237+        # wasted allocations.
3238+        self.failUnlessEqual(len(self.g.get_obsolete_allocations()), 2)
3239+
3240+
3241+    def test_existing_reuse(self):
3242+        # The HappinessGraph should attempt to use existing shares
3243+        # whenever possible to reduce network traffic.
3244+        self.g = HappinessGraph(3, 7, 10)
3245+
3246+        servers = self.make_servers_to(10)
3247+
3248+        self.g.add_servers(servers)
3249+        self.g.add_server_with_share("server0", 0)
3250+        self.g.add_server_with_share("server1", 1)
3251+        self.g.add_server_with_share("server2", 2)
3252+
3253+        # Now get share assignments. We should have 7 assignments.
3254+        assignments = self.g.get_share_assignments()
3255+        self.failUnless(self.g.is_healthy())
3256+        self.failUnlessEqual(self.g._computed_happiness, 10)
3257+        self.failUnlessEqual(len(assignments), 7)
3258+
3259+
3260+    def test_homeless_share_efficiency(self):
3261+        # When we're dealing with an inefficient layout, the
3262+        # HappinessGraph object should be smart enough to
3263+        # use the existing shares when deciding on where to place
3264+        # things.
3265+        self.g = HappinessGraph(1, 2, 10)
3266+
3267+        # Add two servers, and say that between the two of them they
3268+        # have all of the shares. We shouldn't have to allocate any
3269+        # shares to satisfy the upload.
3270+        servers = self.make_servers_to(2)
3271+        self.g.add_servers(servers)
3272+
3273+        for i in (1, 3, 4, 8, 9):
3274+            self.g.add_server_with_share("server1", i)
3275+
3276+        for i in (0, 2, 5, 6, 7):
3277+            self.g.add_server_with_share("server0", i)
3278+
3279+        # Now get share assignments. There shouldn't be any share
3280+        # assignments.
3281+        assignments = self.g.get_share_assignments()
3282+        self.failUnless(self.g.is_healthy())
3283+        self.failUnlessEqual(len(assignments), 0)
3284+
3285+
3286+    def test_conservatism_with_replication(self):
3287+        # The old HappinessGraph was too conservative when k = 1. The
3288+        # new one shouldn't be.
3289+        self.g = HappinessGraph(1, 10, 10)
3290+
3291+        servers = self.make_servers_to(10)
3292+        self.g.add_servers(servers)
3293+
3294+        assignments = self.g.get_share_assignments()
3295+        self.failUnless(self.g.is_healthy())
3296+        self.failUnless(len(assignments), 10)
3297+
3298+
3299+    def test_happinessgraph_homeless_readonly(self):
3300+        # servers with shares already on them shouldn't be assigned new
3301+        # shares if they're not writable
3302+        self.g = HappinessGraph(1, 2, 4)
3303+        servers = self.make_servers_to(1)
3304+        self.g.add_servers(servers)
3305+
3306+        self.g.add_server_with_share("server2", 2)
3307+        assignments = self.g.get_share_assignments()
3308+
3309+        # We should have no homeless shares, but we should have 3
3310+        # allocations going to server 0.
3311+        self.failUnless(len(assignments), 1)
3312+        self.failUnless(self.g.is_healthy())
3313+        self.failUnlessIn("server0", assignments)
3314+        self.failUnlessEqual(len(assignments["server0"]), 3)
3315+
3316+
3317+    def test_happinessgraph_homeless_existing_writable(self):
3318+        # servers with shares already on them should be assigned
3319+        # homeless shares if they're writable.
3320+        self.g = HappinessGraph(1, 2, 4)
3321+        servers = self.make_servers_to(2)
3322+        self.g.add_servers(servers)
3323+
3324+        self.g.add_server_with_share("server1", 1)
3325+        assignments = self.g.get_share_assignments()
3326+
3327+        # We should have at least one assignment for each of these
3328+        # servers.
3329+        self.failUnlessEqual(len(assignments), 2)
3330+        self.failUnlessIn("server1", assignments)
3331+        self.failUnlessEqual(len(assignments["server1"]), 1)
3332+        self.failUnlessIn("server0", assignments)
3333+        self.failUnlessEqual(len(assignments["server0"]), 2)
3334+
3335+
3336 # TODO:
3337 #  upload with exactly 75 servers (shares_of_happiness)
3338 #  have a download fail
3339}
3340
3341Context:
3342
3343[docs: FTP-and-SFTP.rst: fix a minor error and update the information about which version of Twisted fixes #1297
3344zooko@zooko.com**20110428055232
3345 Ignore-this: b63cfb4ebdbe32fb3b5f885255db4d39
3346] 
3347[munin tahoe_files plugin: fix incorrect file count
3348francois@ctrlaltdel.ch**20110428055312
3349 Ignore-this: 334ba49a0bbd93b4a7b06a25697aba34
3350 fixes #1391
3351] 
3352[corrected "k must never be smaller than N" to "k must never be greater than N"
3353secorp@allmydata.org**20110425010308
3354 Ignore-this: 233129505d6c70860087f22541805eac
3355] 
3356[Fix a test failure in test_package_initialization on Python 2.4.x due to exceptions being stringified differently than in later versions of Python. refs #1389
3357david-sarah@jacaranda.org**20110411190738
3358 Ignore-this: 7847d26bc117c328c679f08a7baee519
3359] 
3360[tests: add test for including the ImportError message and traceback entry in the summary of errors from importing dependencies. refs #1389
3361david-sarah@jacaranda.org**20110410155844
3362 Ignore-this: fbecdbeb0d06a0f875fe8d4030aabafa
3363] 
3364[allmydata/__init__.py: preserve the message and last traceback entry (file, line number, function, and source line) of ImportErrors in the package versions string. fixes #1389
3365david-sarah@jacaranda.org**20110410155705
3366 Ignore-this: 2f87b8b327906cf8bfca9440a0904900
3367] 
3368[remove unused variable detected by pyflakes
3369zooko@zooko.com**20110407172231
3370 Ignore-this: 7344652d5e0720af822070d91f03daf9
3371] 
3372[allmydata/__init__.py: Nicer reporting of unparseable version numbers in dependencies. fixes #1388
3373david-sarah@jacaranda.org**20110401202750
3374 Ignore-this: 9c6bd599259d2405e1caadbb3e0d8c7f
3375] 
3376[update FTP-and-SFTP.rst: the necessary patch is included in Twisted-10.1
3377Brian Warner <warner@lothar.com>**20110325232511
3378 Ignore-this: d5307faa6900f143193bfbe14e0f01a
3379] 
3380[control.py: remove all uses of s.get_serverid()
3381warner@lothar.com**20110227011203
3382 Ignore-this: f80a787953bd7fa3d40e828bde00e855
3383] 
3384[web: remove some uses of s.get_serverid(), not all
3385warner@lothar.com**20110227011159
3386 Ignore-this: a9347d9cf6436537a47edc6efde9f8be
3387] 
3388[immutable/downloader/fetcher.py: remove all get_serverid() calls
3389warner@lothar.com**20110227011156
3390 Ignore-this: fb5ef018ade1749348b546ec24f7f09a
3391] 
3392[immutable/downloader/fetcher.py: fix diversity bug in server-response handling
3393warner@lothar.com**20110227011153
3394 Ignore-this: bcd62232c9159371ae8a16ff63d22c1b
3395 
3396 When blocks terminate (either COMPLETE or CORRUPT/DEAD/BADSEGNUM), the
3397 _shares_from_server dict was being popped incorrectly (using shnum as the
3398 index instead of serverid). I'm still thinking through the consequences of
3399 this bug. It was probably benign and really hard to detect. I think it would
3400 cause us to incorrectly believe that we're pulling too many shares from a
3401 server, and thus prefer a different server rather than asking for a second
3402 share from the first server. The diversity code is intended to spread out the
3403 number of shares simultaneously being requested from each server, but with
3404 this bug, it might be spreading out the total number of shares requested at
3405 all, not just simultaneously. (note that SegmentFetcher is scoped to a single
3406 segment, so the effect doesn't last very long).
3407] 
3408[immutable/downloader/share.py: reduce get_serverid(), one left, update ext deps
3409warner@lothar.com**20110227011150
3410 Ignore-this: d8d56dd8e7b280792b40105e13664554
3411 
3412 test_download.py: create+check MyShare instances better, make sure they share
3413 Server objects, now that finder.py cares
3414] 
3415[immutable/downloader/finder.py: reduce use of get_serverid(), one left
3416warner@lothar.com**20110227011146
3417 Ignore-this: 5785be173b491ae8a78faf5142892020
3418] 
3419[immutable/offloaded.py: reduce use of get_serverid() a bit more
3420warner@lothar.com**20110227011142
3421 Ignore-this: b48acc1b2ae1b311da7f3ba4ffba38f
3422] 
3423[immutable/upload.py: reduce use of get_serverid()
3424warner@lothar.com**20110227011138
3425 Ignore-this: ffdd7ff32bca890782119a6e9f1495f6
3426] 
3427[immutable/checker.py: remove some uses of s.get_serverid(), not all
3428warner@lothar.com**20110227011134
3429 Ignore-this: e480a37efa9e94e8016d826c492f626e
3430] 
3431[add remaining get_* methods to storage_client.Server, NoNetworkServer, and
3432warner@lothar.com**20110227011132
3433 Ignore-this: 6078279ddf42b179996a4b53bee8c421
3434 MockIServer stubs
3435] 
3436[upload.py: rearrange _make_trackers a bit, no behavior changes
3437warner@lothar.com**20110227011128
3438 Ignore-this: 296d4819e2af452b107177aef6ebb40f
3439] 
3440[happinessutil.py: finally rename merge_peers to merge_servers
3441warner@lothar.com**20110227011124
3442 Ignore-this: c8cd381fea1dd888899cb71e4f86de6e
3443] 
3444[test_upload.py: factor out FakeServerTracker
3445warner@lothar.com**20110227011120
3446 Ignore-this: 6c182cba90e908221099472cc159325b
3447] 
3448[test_upload.py: server-vs-tracker cleanup
3449warner@lothar.com**20110227011115
3450 Ignore-this: 2915133be1a3ba456e8603885437e03
3451] 
3452[happinessutil.py: server-vs-tracker cleanup
3453warner@lothar.com**20110227011111
3454 Ignore-this: b856c84033562d7d718cae7cb01085a9
3455] 
3456[upload.py: more tracker-vs-server cleanup
3457warner@lothar.com**20110227011107
3458 Ignore-this: bb75ed2afef55e47c085b35def2de315
3459] 
3460[upload.py: fix var names to avoid confusion between 'trackers' and 'servers'
3461warner@lothar.com**20110227011103
3462 Ignore-this: 5d5e3415b7d2732d92f42413c25d205d
3463] 
3464[refactor: s/peer/server/ in immutable/upload, happinessutil.py, test_upload
3465warner@lothar.com**20110227011100
3466 Ignore-this: 7ea858755cbe5896ac212a925840fe68
3467 
3468 No behavioral changes, just updating variable/method names and log messages.
3469 The effects outside these three files should be minimal: some exception
3470 messages changed (to say "server" instead of "peer"), and some internal class
3471 names were changed. A few things still use "peer" to minimize external
3472 changes, like UploadResults.timings["peer_selection"] and
3473 happinessutil.merge_peers, which can be changed later.
3474] 
3475[storage_client.py: clean up test_add_server/test_add_descriptor, remove .test_servers
3476warner@lothar.com**20110227011056
3477 Ignore-this: efad933e78179d3d5fdcd6d1ef2b19cc
3478] 
3479[test_client.py, upload.py:: remove KiB/MiB/etc constants, and other dead code
3480warner@lothar.com**20110227011051
3481 Ignore-this: dc83c5794c2afc4f81e592f689c0dc2d
3482] 
3483[test: increase timeout on a network test because Francois's ARM machine hit that timeout
3484zooko@zooko.com**20110317165909
3485 Ignore-this: 380c345cdcbd196268ca5b65664ac85b
3486 I'm skeptical that the test was proceeding correctly but ran out of time. It seems more likely that it had gotten hung. But if we raise the timeout to an even more extravagant number then we can be even more certain that the test was never going to finish.
3487] 
3488[docs/configuration.rst: add a "Frontend Configuration" section
3489Brian Warner <warner@lothar.com>**20110222014323
3490 Ignore-this: 657018aa501fe4f0efef9851628444ca
3491 
3492 this points to docs/frontends/*.rst, which were previously underlinked
3493] 
3494[web/filenode.py: avoid calling req.finish() on closed HTTP connections. Closes #1366
3495"Brian Warner <warner@lothar.com>"**20110221061544
3496 Ignore-this: 799d4de19933f2309b3c0c19a63bb888
3497] 
3498[Add unit tests for cross_check_pkg_resources_versus_import, and a regression test for ref #1355. This requires a little refactoring to make it testable.
3499david-sarah@jacaranda.org**20110221015817
3500 Ignore-this: 51d181698f8c20d3aca58b057e9c475a
3501] 
3502[allmydata/__init__.py: .name was used in place of the correct .__name__ when printing an exception. Also, robustify string formatting by using %r instead of %s in some places. fixes #1355.
3503david-sarah@jacaranda.org**20110221020125
3504 Ignore-this: b0744ed58f161bf188e037bad077fc48
3505] 
3506[Refactor StorageFarmBroker handling of servers
3507Brian Warner <warner@lothar.com>**20110221015804
3508 Ignore-this: 842144ed92f5717699b8f580eab32a51
3509 
3510 Pass around IServer instance instead of (peerid, rref) tuple. Replace
3511 "descriptor" with "server". Other replacements:
3512 
3513  get_all_servers -> get_connected_servers/get_known_servers
3514  get_servers_for_index -> get_servers_for_psi (now returns IServers)
3515 
3516 This change still needs to be pushed further down: lots of code is now
3517 getting the IServer and then distributing (peerid, rref) internally.
3518 Instead, it ought to distribute the IServer internally and delay
3519 extracting a serverid or rref until the last moment.
3520 
3521 no_network.py was updated to retain parallelism.
3522] 
3523[TAG allmydata-tahoe-1.8.2
3524warner@lothar.com**20110131020101] 
3525Patch bundle hash:
3526cc3f1072d6970ca2c498ad74043a5f4c6ffa73ac