Changeset 2481290 in trunk


Ignore:
Timestamp:
2012-03-31T00:29:06Z (13 years ago)
Author:
Brian Warner <warner@…>
Branches:
master
Children:
14a50f2
Parents:
c5e10e2
Message:

test_introducer.SystemTest?: fix race condition

SystemTest? has a couple of different phases, separated by a poller which
waits for everything to be idle (all messages delivered, none in flight). It
does this by watching some internal "_debug_outstanding" counters in the
server and in each client, and waiting for them to hit zero.

Just before the last phase, we replace the server with a new one (to make
sure clients re-send their messages properly). Unfortunately, the polling
function closed over the variable holding the original server, and didn't see
the replacement. It kept polling the old server, and failed to notice the
outstanding messages for the new server. The last phase of the test (check3)
was started too early, which failed (since some messages had not yet been
delivered), and then exploded in a flurry of dirty-reactor errors (because
some messages were delivered after test shutdown).

This replaces the closed-over-variable with a "self.the_introducer", which
seems to fix the race.

One additional place to look at in the future: the client
announcement-receive path (remote_announce) uses an eventually(). If the
message has been received and the eventual-send posted (but not yet executed)
when the poller sees it, the poller might erroneously conclude that the
client is idle and cause the same problem as above. To fix this, the poller
(probably all pollers) could be enhanced to do a flushEventualQueue before
querying the are-we-done-yet predicate function.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • TabularUnified src/allmydata/test/test_introducer.py

    rc5e10e2 r2481290  
    388388        subscribing_clients = []
    389389        publishing_clients = []
     390        self.the_introducer = introducer
    390391        privkeys = {}
    391392        expected_announcements = [0 for c in range(NUM_CLIENTS)]
     
    484485                    if c._debug_outstanding:
    485486                        return False
    486                 if introducer._debug_outstanding:
     487                if self.the_introducer._debug_outstanding:
    487488                    return False
    488489                return True
     
    496497        def _check1(res):
    497498            log.msg("doing _check1")
    498             dc = introducer._debug_counts
     499            dc = self.the_introducer._debug_counts
    499500            if server_version == V1:
    500501                # each storage server publishes a record, and (after its
     
    595596                for k in c._debug_counts:
    596597                    c._debug_counts[k] = 0
    597             for k in introducer._debug_counts:
    598                 introducer._debug_counts[k] = 0
     598            for k in self.the_introducer._debug_counts:
     599                self.the_introducer._debug_counts[k] = 0
    599600            expected_announcements[i] += 1 # new 'storage' for everyone
    600601            self.create_tub(self.central_portnum)
    601             newfurl = self.central_tub.registerReference(introducer,
     602            newfurl = self.central_tub.registerReference(self.the_introducer,
    602603                                                         furlFile=iff)
    603604            assert newfurl == self.introducer_furl
     
    615616            # assert that the introducer sent out new messages, one per
    616617            # subscriber
    617             dc = introducer._debug_counts
     618            dc = self.the_introducer._debug_counts
    618619            self.failUnlessEqual(dc["outbound_announcements"],
    619620                                 NUM_STORAGE*NUM_CLIENTS)
     
    653654            else:
    654655                introducer = IntroducerService()
    655             newfurl = self.central_tub.registerReference(introducer,
     656            self.the_introducer = introducer
     657            newfurl = self.central_tub.registerReference(self.the_introducer,
    656658                                                         furlFile=iff)
    657659            assert newfurl == self.introducer_furl
     
    664666        def _check3(res):
    665667            log.msg("doing _check3")
    666             dc = introducer._debug_counts
     668            dc = self.the_introducer._debug_counts
    667669            self.failUnlessEqual(dc["outbound_announcements"],
    668670                                 NUM_STORAGE*NUM_CLIENTS)
Note: See TracChangeset for help on using the changeset viewer.