Ticket #392: pipeline.diff

File pipeline.diff, 14.1 KB (added by warner, at 2009-04-15T19:23:12Z)

patch to add pipelining to immutable upload

  • src/allmydata/immutable/layout.py

    diff --git a/src/allmydata/immutable/layout.py b/src/allmydata/immutable/layout.py
    index 6855562..6ca5339 100644
    a b from zope.interface import implements 
    33from twisted.internet import defer
    44from allmydata.interfaces import IStorageBucketWriter, IStorageBucketReader, \
    55     FileTooLargeError, HASH_SIZE
    6 from allmydata.util import mathutil, idlib, observer
     6from allmydata.util import mathutil, idlib, observer, pipeline
    77from allmydata.util.assertutil import precondition
    88from allmydata.storage.server import si_b2a
    99
    class WriteBucketProxy: 
    9393    fieldstruct = ">L"
    9494
    9595    def __init__(self, rref, data_size, block_size, num_segments,
    96                  num_share_hashes, uri_extension_size_max, nodeid):
     96                 num_share_hashes, uri_extension_size_max, nodeid,
     97                 pipeline_size=50000):
    9798        self._rref = rref
    9899        self._data_size = data_size
    99100        self._block_size = block_size
    class WriteBucketProxy: 
    110111
    111112        self._create_offsets(block_size, data_size)
    112113
     114        # k=3, max_segment_size=128KiB gives us a typical segment of 43691
     115        # bytes. Setting the default pipeline_size to 50KB lets us get two
     116        # segments onto the wire but not a third, which would keep the pipe
     117        # filled.
     118        self._pipeline = pipeline.Pipeline(pipeline_size)
     119
    113120    def get_allocated_size(self):
    114121        return (self._offsets['uri_extension'] + self.fieldsize +
    115122                self._uri_extension_size_max)
    class WriteBucketProxy: 
    218225        return self._write(offset, length+data)
    219226
    220227    def _write(self, offset, data):
    221         # TODO: for small shares, buffer the writes and do just a single call
    222         return self._rref.callRemote("write", offset, data)
     228        # use a Pipeline to pipeline several writes together. TODO: another
     229        # speedup would be to coalesce small writes into a single call: this
     230        # would reduce the foolscap CPU overhead per share, but wouldn't
     231        # reduce the number of round trips, so it might not be worth the
     232        # effort.
     233
     234        return self._pipeline.add(len(data),
     235                                  self._rref.callRemote, "write", offset, data)
    223236
    224237    def close(self):
    225         return self._rref.callRemote("close")
     238        d = self._pipeline.add(0, self._rref.callRemote, "close")
     239        d.addCallback(lambda ign: self._pipeline.flush())
     240        return d
    226241
    227242    def abort(self):
    228243        return self._rref.callRemoteOnly("abort")
  • src/allmydata/test/test_util.py

    diff --git a/src/allmydata/test/test_util.py b/src/allmydata/test/test_util.py
    index 4a97001..16a63f0 100644
    a b from StringIO import StringIO 
    66from twisted.trial import unittest
    77from twisted.internet import defer, reactor
    88from twisted.python.failure import Failure
     9from twisted.python import log
    910
    1011from allmydata.util import base32, idlib, humanreadable, mathutil, hashutil
    1112from allmydata.util import assertutil, fileutil, deferredutil, abbreviate
    1213from allmydata.util import limiter, time_format, pollmixin, cachedir
    13 from allmydata.util import statistics, dictutil, rrefutil
     14from allmydata.util import statistics, dictutil, rrefutil, pipeline
    1415from allmydata.util.rrefutil import ServerFailure
    1516
    1617class Base32(unittest.TestCase):
    class RemoteFailures(unittest.TestCase): 
    13001301                                  rrefutil.trap_local, f, IndexError)
    13011302        d.addErrback(_check)
    13021303        return d
     1304
     1305class Pipeline(unittest.TestCase):
     1306    def pause(self, *args, **kwargs):
     1307        d = defer.Deferred()
     1308        self.calls.append( (d, args, kwargs) )
     1309        return d
     1310
     1311    def failUnlessCallsAre(self, expected):
     1312        #print self.calls
     1313        #print expected
     1314        self.failUnlessEqual(len(self.calls), len(expected), self.calls)
     1315        for i,c in enumerate(self.calls):
     1316            self.failUnlessEqual(c[1:], expected[i], str(i))
     1317
     1318    def test_basic(self):
     1319        self.calls = []
     1320        finished = []
     1321        p = pipeline.Pipeline(100)
     1322
     1323        d = p.flush() # fires immediately
     1324        d.addCallbacks(finished.append, log.err)
     1325        self.failUnlessEqual(len(finished), 1)
     1326        finished = []
     1327
     1328        d = p.add(10, self.pause, "one")
     1329        # the call should start right away, and our return Deferred should
     1330        # fire right away
     1331        d.addCallbacks(finished.append, log.err)
     1332        self.failUnlessEqual(len(finished), 1)
     1333        self.failUnlessEqual(finished[0], None)
     1334        self.failUnlessCallsAre([ ( ("one",) , {} ) ])
     1335        self.failUnlessEqual(p.gauge, 10)
     1336
     1337        # pipeline: [one]
     1338
     1339        finished = []
     1340        d = p.add(20, self.pause, "two", kw=2)
     1341        # pipeline: [one, two]
     1342
     1343        # the call and the Deferred should fire right away
     1344        d.addCallbacks(finished.append, log.err)
     1345        self.failUnlessEqual(len(finished), 1)
     1346        self.failUnlessEqual(finished[0], None)
     1347        self.failUnlessCallsAre([ ( ("one",) , {} ),
     1348                                  ( ("two",) , {"kw": 2} ),
     1349                                  ])
     1350        self.failUnlessEqual(p.gauge, 30)
     1351
     1352        self.calls[0][0].callback("one-result")
     1353        # pipeline: [two]
     1354        self.failUnlessEqual(p.gauge, 20)
     1355
     1356        finished = []
     1357        d = p.add(90, self.pause, "three", "posarg1")
     1358        # pipeline: [two, three]
     1359        flushed = []
     1360        fd = p.flush()
     1361        fd.addCallbacks(flushed.append, log.err)
     1362        self.failUnlessEqual(flushed, [])
     1363
     1364        # the call will be made right away, but the return Deferred will not,
     1365        # because the pipeline is now full.
     1366        d.addCallbacks(finished.append, log.err)
     1367        self.failUnlessEqual(len(finished), 0)
     1368        self.failUnlessCallsAre([ ( ("one",) , {} ),
     1369                                  ( ("two",) , {"kw": 2} ),
     1370                                  ( ("three", "posarg1"), {} ),
     1371                                  ])
     1372        self.failUnlessEqual(p.gauge, 110)
     1373
     1374        self.failUnlessRaises(pipeline.SingleFileError, p.add, 10, self.pause)
     1375
     1376        # retiring either call will unblock the pipeline, causing the #3
     1377        # Deferred to fire
     1378        self.calls[2][0].callback("three-result")
     1379        # pipeline: [two]
     1380
     1381        self.failUnlessEqual(len(finished), 1)
     1382        self.failUnlessEqual(finished[0], None)
     1383        self.failUnlessEqual(flushed, [])
     1384
     1385        # retiring call#2 will finally allow the flush() Deferred to fire
     1386        self.calls[1][0].callback("two-result")
     1387        self.failUnlessEqual(len(flushed), 1)
     1388
     1389    def test_errors(self):
     1390        self.calls = []
     1391        p = pipeline.Pipeline(100)
     1392
     1393        d1 = p.add(200, self.pause, "one")
     1394        d2 = p.flush()
     1395
     1396        finished = []
     1397        d1.addBoth(finished.append)
     1398        self.failUnlessEqual(finished, [])
     1399
     1400        flushed = []
     1401        d2.addBoth(flushed.append)
     1402        self.failUnlessEqual(flushed, [])
     1403
     1404        self.calls[0][0].errback(ValueError("oops"))
     1405
     1406        self.failUnlessEqual(len(finished), 1)
     1407        f = finished[0]
     1408        self.failUnless(isinstance(f, Failure))
     1409        self.failUnless(f.check(pipeline.PipelineError))
     1410        r = repr(f.value)
     1411        self.failUnless("ValueError" in r, r)
     1412        f2 = f.value.error
     1413        self.failUnless(f2.check(ValueError))
     1414
     1415        self.failUnlessEqual(len(flushed), 1)
     1416        f = flushed[0]
     1417        self.failUnless(isinstance(f, Failure))
     1418        self.failUnless(f.check(pipeline.PipelineError))
     1419        f2 = f.value.error
     1420        self.failUnless(f2.check(ValueError))
     1421
     1422        # now that the pipeline is in the failed state, any new calls will
     1423        # fail immediately
     1424
     1425        d3 = p.add(20, self.pause, "two")
     1426
     1427        finished = []
     1428        d3.addBoth(finished.append)
     1429        self.failUnlessEqual(len(finished), 1)
     1430        f = finished[0]
     1431        self.failUnless(isinstance(f, Failure))
     1432        self.failUnless(f.check(pipeline.PipelineError))
     1433        r = repr(f.value)
     1434        self.failUnless("ValueError" in r, r)
     1435        f2 = f.value.error
     1436        self.failUnless(f2.check(ValueError))
     1437
     1438        d4 = p.flush()
     1439        flushed = []
     1440        d4.addBoth(flushed.append)
     1441        self.failUnlessEqual(len(flushed), 1)
     1442        f = flushed[0]
     1443        self.failUnless(isinstance(f, Failure))
     1444        self.failUnless(f.check(pipeline.PipelineError))
     1445        f2 = f.value.error
     1446        self.failUnless(f2.check(ValueError))
     1447       
     1448
     1449    def test_errors2(self):
     1450        self.calls = []
     1451        p = pipeline.Pipeline(100)
     1452
     1453        d1 = p.add(10, self.pause, "one")
     1454        d2 = p.add(20, self.pause, "two")
     1455        d3 = p.add(30, self.pause, "three")
     1456        d4 = p.flush()
     1457
     1458        # one call fails, then the second one succeeds: make sure
     1459        # ExpandableDeferredList tolerates the second one
     1460
     1461        flushed = []
     1462        d4.addBoth(flushed.append)
     1463        self.failUnlessEqual(flushed, [])
     1464
     1465        self.calls[0][0].errback(ValueError("oops"))
     1466        self.failUnlessEqual(len(flushed), 1)
     1467        f = flushed[0]
     1468        self.failUnless(isinstance(f, Failure))
     1469        self.failUnless(f.check(pipeline.PipelineError))
     1470        f2 = f.value.error
     1471        self.failUnless(f2.check(ValueError))
     1472
     1473        self.calls[1][0].callback("two-result")
     1474        self.calls[2][0].errback(ValueError("three-error"))
  • new file src/allmydata/util/pipeline.py

    diff --git a/src/allmydata/util/pipeline.py b/src/allmydata/util/pipeline.py
    new file mode 100755
    index 0000000..5f3b031
    - +  
     1
     2from twisted.internet import defer
     3from twisted.python.failure import Failure
     4from twisted.python import log
     5from allmydata.util.assertutil import precondition
     6
     7class PipelineError(Exception):
     8    """One of the pipelined messages returned an error. The received Failure
     9    object is stored in my .error attribute."""
     10    def __init__(self, error):
     11        self.error = error
     12
     13    def __repr__(self):
     14        return "<PipelineError error=(%r)>" % self.error
     15
     16class SingleFileError(Exception):
     17    """You are not permitted to add a job to a full pipeline."""
     18
     19
     20class ExpandableDeferredList(defer.Deferred):
     21    # like DeferredList(fireOnOneErrback=True) with a built-in
     22    # gatherResults(), but you can add new Deferreds until you close it. This
     23    # gives you a chance to add don't-complain-about-unhandled-error errbacks
     24    # immediately after attachment, regardless of whether you actually end up
     25    # wanting the list or not.
     26    def __init__(self):
     27        defer.Deferred.__init__(self)
     28        self.resultsReceived = 0
     29        self.resultList = []
     30        self.failure = None
     31        self.closed = False
     32
     33    def addDeferred(self, d):
     34        precondition(not self.closed, "don't call addDeferred() on a closed ExpandableDeferredList")
     35        index = len(self.resultList)
     36        self.resultList.append(None)
     37        d.addCallbacks(self._cbDeferred, self._ebDeferred,
     38                       callbackArgs=(index,))
     39        return d
     40
     41    def close(self):
     42        self.closed = True
     43        self.checkForFinished()
     44
     45    def checkForFinished(self):
     46        if not self.closed:
     47            return
     48        if self.called:
     49            return
     50        if self.failure:
     51            self.errback(self.failure)
     52        elif self.resultsReceived == len(self.resultList):
     53            self.callback(self.resultList)
     54
     55    def _cbDeferred(self, res, index):
     56        self.resultList[index] = res
     57        self.resultsReceived += 1
     58        self.checkForFinished()
     59        return res
     60
     61    def _ebDeferred(self, f):
     62        self.failure = f
     63        self.checkForFinished()
     64        return f
     65
     66
     67class Pipeline:
     68    """I manage a size-limited pipeline of Deferred operations, usually
     69    callRemote() messages."""
     70
     71    def __init__(self, capacity):
     72        self.capacity = capacity # how full we can be
     73        self.gauge = 0 # how full we are
     74        self.failure = None
     75        self.waiting = [] # callers of add() who are blocked
     76        self.unflushed = ExpandableDeferredList()
     77
     78    def add(self, _size, _func, *args, **kwargs):
     79        # We promise that all the Deferreds we return will fire in the order
     80        # they were returned. To make it easier to keep this promise, we
     81        # prohibit multiple outstanding calls to add() .
     82        if self.waiting:
     83            raise SingleFileError
     84        if self.failure:
     85            return defer.fail(self.failure)
     86        self.gauge += _size
     87        fd = defer.maybeDeferred(_func, *args, **kwargs)
     88        fd.addBoth(self._call_finished, _size)
     89        self.unflushed.addDeferred(fd)
     90        fd.addErrback(self._eat_pipeline_errors)
     91        fd.addErrback(log.err, "_eat_pipeline_errors didn't eat it")
     92        if self.gauge < self.capacity:
     93            return defer.succeed(None)
     94        d = defer.Deferred()
     95        self.waiting.append(d)
     96        return d
     97
     98    def flush(self):
     99        if self.failure:
     100            return defer.fail(self.failure)
     101        d, self.unflushed = self.unflushed, ExpandableDeferredList()
     102        d.close()
     103        d.addErrback(self._flushed_error)
     104        return d
     105
     106    def _flushed_error(self, f):
     107        precondition(self.failure) # should have been set by _call_finished
     108        return self.failure
     109
     110    def _call_finished(self, res, size):
     111        self.gauge -= size
     112        if isinstance(res, Failure):
     113            res = Failure(PipelineError(res))
     114            if not self.failure:
     115                self.failure = res
     116        if self.failure:
     117            while self.waiting:
     118                d = self.waiting.pop(0)
     119                d.errback(self.failure)
     120        else:
     121            while self.waiting and (self.gauge < self.capacity):
     122                d = self.waiting.pop(0)
     123                d.callback(None)
     124                # the d.callback() might trigger a new call to add(), which
     125                # will raise our gauge and might cause the pipeline to be
     126                # filled. So the while() loop gets a chance to tell the
     127                # caller to stop.
     128        return res
     129
     130    def _eat_pipeline_errors(self, f):
     131        f.trap(PipelineError)
     132        return None