Ticket #392: pipeline.diff
File pipeline.diff, 14.1 KB (added by warner, at 2009-04-15T19:23:12Z) |
---|
-
src/allmydata/immutable/layout.py
diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py index 6855562..6ca5339 100644
a b from zope.interface import implements 3 3 from twisted.internet import defer 4 4 from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \ 5 5 FileTooLargeError, HASH_SIZE 6 from allmydata.util import mathutil, idlib, observer 6 from allmydata.util import mathutil, idlib, observer, pipeline 7 7 from allmydata.util.assertutil import precondition 8 8 from allmydata.storage.server import si_b2a 9 9 … … class WriteBucketProxy: 93 93 fieldstruct = ">L" 94 94 95 95 def __init__(self, rref, data_size, block_size, num_segments, 96 num_share_hashes, uri_extension_size_max, nodeid): 96 num_share_hashes, uri_extension_size_max, nodeid, 97 pipeline_size=50000): 97 98 self._rref = rref 98 99 self._data_size = data_size 99 100 self._block_size = block_size … … class WriteBucketProxy: 110 111 111 112 self._create_offsets(block_size, data_size) 112 113 114 # k=3, max_segment_size=128KiB gives us a typical segment of 43691 115 # bytes. Setting the default pipeline_size to 50KB lets us get two 116 # segments onto the wire but not a third, which would keep the pipe 117 # filled. 118 self._pipeline = pipeline.Pipeline(pipeline_size) 119 113 120 def get_allocated_size(self): 114 121 return (self._offsets['uri_extension'] + self.fieldsize + 115 122 self._uri_extension_size_max) … … class WriteBucketProxy: 218 225 return self._write(offset, length+data) 219 226 220 227 def _write(self, offset, data): 221 # TODO: for small shares, buffer the writes and do just a single call 222 return self._rref.callRemote("write", offset, data) 228 # use a Pipeline to pipeline several writes together. TODO: another 229 # speedup would be to coalesce small writes into a single call: this 230 # would reduce the foolscap CPU overhead per share, but wouldn't 231 # reduce the number of round trips, so it might not be worth the 232 # effort. 233 234 return self._pipeline.add(len(data), 235 self._rref.callRemote, "write", offset, data) 223 236 224 237 def close(self): 225 return self._rref.callRemote("close") 238 d = self._pipeline.add(0, self._rref.callRemote, "close") 239 d.addCallback(lambda ign: self._pipeline.flush()) 240 return d 226 241 227 242 def abort(self): 228 243 return self._rref.callRemoteOnly("abort") -
src/allmydata/test/test_util.py
diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py index 4a97001..16a63f0 100644
a b from StringIO import StringIO 6 6 from twisted.trial import unittest 7 7 from twisted.internet import defer, reactor 8 8 from twisted.python.failure import Failure 9 from twisted.python import log 9 10 10 11 from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil 11 12 from allmydata.util import assertutil, fileutil, deferredutil, abbreviate 12 13 from allmydata.util import limiter, time_format, pollmixin, cachedir 13 from allmydata.util import statistics, dictutil, rrefutil 14 from allmydata.util import statistics, dictutil, rrefutil, pipeline 14 15 from allmydata.util.rrefutil import ServerFailure 15 16 16 17 class Base32(unittest.TestCase): … … class RemoteFailures(unittest.TestCase): 1300 1301 rrefutil.trap_local, f, IndexError) 1301 1302 d.addErrback(_check) 1302 1303 return d 1304 1305 class Pipeline(unittest.TestCase): 1306 def pause(self, *args, **kwargs): 1307 d = defer.Deferred() 1308 self.calls.append( (d, args, kwargs) ) 1309 return d 1310 1311 def failUnlessCallsAre(self, expected): 1312 #print self.calls 1313 #print expected 1314 self.failUnlessEqual(len(self.calls), len(expected), self.calls) 1315 for i,c in enumerate(self.calls): 1316 self.failUnlessEqual(c[1:], expected[i], str(i)) 1317 1318 def test_basic(self): 1319 self.calls = [] 1320 finished = [] 1321 p = pipeline.Pipeline(100) 1322 1323 d = p.flush() # fires immediately 1324 d.addCallbacks(finished.append, log.err) 1325 self.failUnlessEqual(len(finished), 1) 1326 finished = [] 1327 1328 d = p.add(10, self.pause, "one") 1329 # the call should start right away, and our return Deferred should 1330 # fire right away 1331 d.addCallbacks(finished.append, log.err) 1332 self.failUnlessEqual(len(finished), 1) 1333 self.failUnlessEqual(finished[0], None) 1334 self.failUnlessCallsAre([ ( ("one",) , {} ) ]) 1335 self.failUnlessEqual(p.gauge, 10) 1336 1337 # pipeline: [one] 1338 1339 finished = [] 1340 d = p.add(20, self.pause, "two", kw=2) 1341 # pipeline: [one, two] 1342 1343 # the call and the Deferred should fire right away 1344 d.addCallbacks(finished.append, log.err) 1345 self.failUnlessEqual(len(finished), 1) 1346 self.failUnlessEqual(finished[0], None) 1347 self.failUnlessCallsAre([ ( ("one",) , {} ), 1348 ( ("two",) , {"kw": 2} ), 1349 ]) 1350 self.failUnlessEqual(p.gauge, 30) 1351 1352 self.calls[0][0].callback("one-result") 1353 # pipeline: [two] 1354 self.failUnlessEqual(p.gauge, 20) 1355 1356 finished = [] 1357 d = p.add(90, self.pause, "three", "posarg1") 1358 # pipeline: [two, three] 1359 flushed = [] 1360 fd = p.flush() 1361 fd.addCallbacks(flushed.append, log.err) 1362 self.failUnlessEqual(flushed, []) 1363 1364 # the call will be made right away, but the return Deferred will not, 1365 # because the pipeline is now full. 1366 d.addCallbacks(finished.append, log.err) 1367 self.failUnlessEqual(len(finished), 0) 1368 self.failUnlessCallsAre([ ( ("one",) , {} ), 1369 ( ("two",) , {"kw": 2} ), 1370 ( ("three", "posarg1"), {} ), 1371 ]) 1372 self.failUnlessEqual(p.gauge, 110) 1373 1374 self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause) 1375 1376 # retiring either call will unblock the pipeline, causing the #3 1377 # Deferred to fire 1378 self.calls[2][0].callback("three-result") 1379 # pipeline: [two] 1380 1381 self.failUnlessEqual(len(finished), 1) 1382 self.failUnlessEqual(finished[0], None) 1383 self.failUnlessEqual(flushed, []) 1384 1385 # retiring call#2 will finally allow the flush() Deferred to fire 1386 self.calls[1][0].callback("two-result") 1387 self.failUnlessEqual(len(flushed), 1) 1388 1389 def test_errors(self): 1390 self.calls = [] 1391 p = pipeline.Pipeline(100) 1392 1393 d1 = p.add(200, self.pause, "one") 1394 d2 = p.flush() 1395 1396 finished = [] 1397 d1.addBoth(finished.append) 1398 self.failUnlessEqual(finished, []) 1399 1400 flushed = [] 1401 d2.addBoth(flushed.append) 1402 self.failUnlessEqual(flushed, []) 1403 1404 self.calls[0][0].errback(ValueError("oops")) 1405 1406 self.failUnlessEqual(len(finished), 1) 1407 f = finished[0] 1408 self.failUnless(isinstance(f, Failure)) 1409 self.failUnless(f.check(pipeline.PipelineError)) 1410 r = repr(f.value) 1411 self.failUnless("ValueError" in r, r) 1412 f2 = f.value.error 1413 self.failUnless(f2.check(ValueError)) 1414 1415 self.failUnlessEqual(len(flushed), 1) 1416 f = flushed[0] 1417 self.failUnless(isinstance(f, Failure)) 1418 self.failUnless(f.check(pipeline.PipelineError)) 1419 f2 = f.value.error 1420 self.failUnless(f2.check(ValueError)) 1421 1422 # now that the pipeline is in the failed state, any new calls will 1423 # fail immediately 1424 1425 d3 = p.add(20, self.pause, "two") 1426 1427 finished = [] 1428 d3.addBoth(finished.append) 1429 self.failUnlessEqual(len(finished), 1) 1430 f = finished[0] 1431 self.failUnless(isinstance(f, Failure)) 1432 self.failUnless(f.check(pipeline.PipelineError)) 1433 r = repr(f.value) 1434 self.failUnless("ValueError" in r, r) 1435 f2 = f.value.error 1436 self.failUnless(f2.check(ValueError)) 1437 1438 d4 = p.flush() 1439 flushed = [] 1440 d4.addBoth(flushed.append) 1441 self.failUnlessEqual(len(flushed), 1) 1442 f = flushed[0] 1443 self.failUnless(isinstance(f, Failure)) 1444 self.failUnless(f.check(pipeline.PipelineError)) 1445 f2 = f.value.error 1446 self.failUnless(f2.check(ValueError)) 1447 1448 1449 def test_errors2(self): 1450 self.calls = [] 1451 p = pipeline.Pipeline(100) 1452 1453 d1 = p.add(10, self.pause, "one") 1454 d2 = p.add(20, self.pause, "two") 1455 d3 = p.add(30, self.pause, "three") 1456 d4 = p.flush() 1457 1458 # one call fails, then the second one succeeds: make sure 1459 # ExpandableDeferredList tolerates the second one 1460 1461 flushed = [] 1462 d4.addBoth(flushed.append) 1463 self.failUnlessEqual(flushed, []) 1464 1465 self.calls[0][0].errback(ValueError("oops")) 1466 self.failUnlessEqual(len(flushed), 1) 1467 f = flushed[0] 1468 self.failUnless(isinstance(f, Failure)) 1469 self.failUnless(f.check(pipeline.PipelineError)) 1470 f2 = f.value.error 1471 self.failUnless(f2.check(ValueError)) 1472 1473 self.calls[1][0].callback("two-result") 1474 self.calls[2][0].errback(ValueError("three-error")) -
new file src/allmydata/util/pipeline.py
diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py new file mode 100755 index 0000000..5f3b031
- + 1 2 from twisted.internet import defer 3 from twisted.python.failure import Failure 4 from twisted.python import log 5 from allmydata.util.assertutil import precondition 6 7 class PipelineError(Exception): 8 """One of the pipelined messages returned an error. The received Failure 9 object is stored in my .error attribute.""" 10 def __init__(self, error): 11 self.error = error 12 13 def __repr__(self): 14 return "<PipelineError error=(%r)>" % self.error 15 16 class SingleFileError(Exception): 17 """You are not permitted to add a job to a full pipeline.""" 18 19 20 class ExpandableDeferredList(defer.Deferred): 21 # like DeferredList(fireOnOneErrback=True) with a built-in 22 # gatherResults(), but you can add new Deferreds until you close it. This 23 # gives you a chance to add don't-complain-about-unhandled-error errbacks 24 # immediately after attachment, regardless of whether you actually end up 25 # wanting the list or not. 26 def __init__(self): 27 defer.Deferred.__init__(self) 28 self.resultsReceived = 0 29 self.resultList = [] 30 self.failure = None 31 self.closed = False 32 33 def addDeferred(self, d): 34 precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList") 35 index = len(self.resultList) 36 self.resultList.append(None) 37 d.addCallbacks(self._cbDeferred, self._ebDeferred, 38 callbackArgs=(index,)) 39 return d 40 41 def close(self): 42 self.closed = True 43 self.checkForFinished() 44 45 def checkForFinished(self): 46 if not self.closed: 47 return 48 if self.called: 49 return 50 if self.failure: 51 self.errback(self.failure) 52 elif self.resultsReceived == len(self.resultList): 53 self.callback(self.resultList) 54 55 def _cbDeferred(self, res, index): 56 self.resultList[index] = res 57 self.resultsReceived += 1 58 self.checkForFinished() 59 return res 60 61 def _ebDeferred(self, f): 62 self.failure = f 63 self.checkForFinished() 64 return f 65 66 67 class Pipeline: 68 """I manage a size-limited pipeline of Deferred operations, usually 69 callRemote() messages.""" 70 71 def __init__(self, capacity): 72 self.capacity = capacity # how full we can be 73 self.gauge = 0 # how full we are 74 self.failure = None 75 self.waiting = [] # callers of add() who are blocked 76 self.unflushed = ExpandableDeferredList() 77 78 def add(self, _size, _func, *args, **kwargs): 79 # We promise that all the Deferreds we return will fire in the order 80 # they were returned. To make it easier to keep this promise, we 81 # prohibit multiple outstanding calls to add() . 82 if self.waiting: 83 raise SingleFileError 84 if self.failure: 85 return defer.fail(self.failure) 86 self.gauge += _size 87 fd = defer.maybeDeferred(_func, *args, **kwargs) 88 fd.addBoth(self._call_finished, _size) 89 self.unflushed.addDeferred(fd) 90 fd.addErrback(self._eat_pipeline_errors) 91 fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it") 92 if self.gauge < self.capacity: 93 return defer.succeed(None) 94 d = defer.Deferred() 95 self.waiting.append(d) 96 return d 97 98 def flush(self): 99 if self.failure: 100 return defer.fail(self.failure) 101 d, self.unflushed = self.unflushed, ExpandableDeferredList() 102 d.close() 103 d.addErrback(self._flushed_error) 104 return d 105 106 def _flushed_error(self, f): 107 precondition(self.failure) # should have been set by _call_finished 108 return self.failure 109 110 def _call_finished(self, res, size): 111 self.gauge -= size 112 if isinstance(res, Failure): 113 res = Failure(PipelineError(res)) 114 if not self.failure: 115 self.failure = res 116 if self.failure: 117 while self.waiting: 118 d = self.waiting.pop(0) 119 d.errback(self.failure) 120 else: 121 while self.waiting and (self.gauge < self.capacity): 122 d = self.waiting.pop(0) 123 d.callback(None) 124 # the d.callback() might trigger a new call to add(), which 125 # will raise our gauge and might cause the pipeline to be 126 # filled. So the while() loop gets a chance to tell the 127 # caller to stop. 128 return res 129 130 def _eat_pipeline_errors(self, f): 131 f.trap(PipelineError) 132 return None