Ticket #1395: not-too-parallel-test-by-patching.diff
File not-too-parallel-test-by-patching.diff, 6.8 KB (added by zooko, at 2011-06-17T06:09:26Z) |
---|
-
src/allmydata/immutable/checker.py
diff -rN -u old-ticket1395/src/allmydata/immutable/checker.py new-ticket1395/src/allmydata/immutable/checker.py
old new 617 617 # to free up the RAM 618 618 return None 619 619 def _get_blocks(vrbp): 620 ds = [] 621 for blocknum in range(veup.num_segments): 620 def _get_block(ign, blocknum): 622 621 db = vrbp.get_block(blocknum) 623 622 db.addCallback(_discard_result) 624 ds.append(db) 625 # this gatherResults will fire once every block of this share has 626 # been downloaded and verified, or else it will errback. 627 return deferredutil.gatherResults(ds) 623 return db 624 dbs = defer.succeed(None) 625 for blocknum in range(veup.num_segments): 626 dbs.addCallback(_get_block, blocknum) 627 # The Deferred we return will fire after every block of this 628 # share has been downloaded and verified successfully, or else it 629 # will errback as soon as the first error is observed. 630 return dbs 631 628 632 d.addCallback(_get_blocks) 629 633 630 634 # if none of those errbacked, the blocks (and the hashes above them) -
src/allmydata/test/test_checker.py
diff -rN -u old-ticket1395/src/allmydata/test/test_checker.py new-ticket1395/src/allmydata/test/test_checker.py
old new 1 from allmydata.util import mockutil 1 2 2 3 import simplejson 3 4 from twisted.trial import unittest … … 319 320 320 321 d.addCallback(lambda ign: self.failUnless(really_did_break)) 321 322 return d 323 324 class CounterHolder(object): 325 def __init__(self): 326 self._num_active_block_fetches = 0 327 self._max_active_block_fetches = 0 328 329 from allmydata.immutable.checker import ValidatedReadBucketProxy 330 class MockVRBP(ValidatedReadBucketProxy): 331 def __init__(self, sharenum, bucket, share_hash_tree, num_blocks, block_size, share_size, counterholder): 332 ValidatedReadBucketProxy.__init__(self, sharenum, bucket, 333 share_hash_tree, num_blocks, 334 block_size, share_size) 335 self.counterholder = counterholder 336 337 def get_block(self, blocknum): 338 self.counterholder._num_active_block_fetches += 1 339 if self.counterholder._num_active_block_fetches > self.counterholder._max_active_block_fetches: 340 self.counterholder._max_active_block_fetches = self.counterholder._num_active_block_fetches 341 d = ValidatedReadBucketProxy.get_block(self, blocknum) 342 def _mark_no_longer_active(res): 343 self.counterholder._num_active_block_fetches -= 1 344 return res 345 d.addBoth(_mark_no_longer_active) 346 return d 347 348 class TooParallel(GridTestMixin, unittest.TestCase): 349 # bug #1395: immutable verifier was aggressively parallized, checking all 350 # blocks of all shares at the same time, blowing our memory budget and 351 # crashing with MemoryErrors on >1GB files. 352 353 @mockutil.patch('allmydata.immutable.checker.ValidatedReadBucketProxy') 354 def test_immutable(self, mockVRBPC): 355 self.basedir = "checker/TooParallel/immutable" 356 357 # If any code asks to instantiate a ValidatedReadBucketProxy, 358 # we give them a MockVRBP which is configured to use our 359 # CounterHolder. 360 counterholder = CounterHolder() 361 def make_mock_VRBP(*args, **kwargs): 362 return MockVRBP(counterholder=counterholder, *args, **kwargs) 363 mockVRBPC.side_effect = make_mock_VRBP 364 365 self.set_up_grid(num_servers=4) 366 c0 = self.g.clients[0] 367 c0.DEFAULT_ENCODING_PARAMETERS = { "k": 1, 368 "happy": 4, 369 "n": 4, 370 "max_segment_size": 5, 371 } 372 self.uris = {} 373 DATA = "data" * 100 # 400/5 = 80 blocks 374 d = c0.upload(Data(DATA, convergence="")) 375 def _do_check(ur): 376 n = c0.create_node_from_uri(ur.uri) 377 return n.check(Monitor(), verify=True) 378 d.addCallback(_do_check) 379 def _check(cr): 380 # the verifier works on all 4 shares in parallel, but only 381 # fetches one block from each share at a time, so we expect to 382 # see 4 parallel fetches 383 self.failUnlessEqual(counterholder._max_active_block_fetches, 4) 384 d.addCallback(_check) 385 return d 386 test_immutable.timeout = 10 -
src/allmydata/util/mockutil.py
diff -rN -u old-ticket1395/src/allmydata/util/mockutil.py new-ticket1395/src/allmydata/util/mockutil.py
old new 1 import mock 2 3 from mock import wraps, DEFAULT, _importer 4 from mock import _patch as original_under_patch 5 6 Deferred = None 7 try: 8 from twisted.internet import defer 9 Deferred = defer.Deferred 10 except ImportError: 11 pass 12 13 # copied from Michael Foord's mock.py and modified 14 15 class _deferrable_under_patch(original_under_patch): 16 def decorate_callable(self, func): 17 if hasattr(func, 'patchings'): 18 func.patchings.append(self) 19 return func 20 21 @wraps(func) 22 def patched(*args, **keywargs): 23 # don't use a with here (backwards compatability with 2.5) 24 extra_args = [] 25 for patching in patched.patchings: 26 arg = patching.__enter__() 27 if patching.new is DEFAULT: 28 extra_args.append(arg) 29 args += tuple(extra_args) 30 def cleanup(res): 31 for patching in reversed(getattr(patched, 'patchings', [])): 32 patching.__exit__() 33 return res 34 singleton = {} 35 retval = singleton 36 try: 37 retval = func(*args, **keywargs) 38 except: 39 cleanup(None) 40 raise 41 if Deferred is None or not isinstance(retval, Deferred): 42 return cleanup(retval) 43 retval.addBoth(cleanup) 44 return retval 45 46 patched.patchings = [self] 47 if hasattr(func, 'func_code'): 48 # not in Python 3 49 patched.compat_co_firstlineno = getattr( 50 func, "compat_co_firstlineno", 51 func.func_code.co_firstlineno 52 ) 53 return patched