Ticket #1545: readv.diff

File readv.diff, 8.0 KB (added by warner, at 2011-09-27T06:54:45Z)

add readv() support to server, use it from the client if available

  • src/allmydata/immutable/downloader/share.py

    diff --git a/src/allmydata/immutable/downloader/share.py b/src/allmydata/immutable/downloader/share.py
    index d512702..8f60232 100644
    a b class Share: 
    725725        # Reconsider the removal: maybe bring it back.
    726726        ds = self._download_status
    727727
     728        v = self._server.get_version()
     729        if (v["http://allmydata.org/tahoe/protocols/storage/v1"]
     730            ["has-immutable-readv"]):
     731            # new-style readv() form
     732            readv = list(ask)
     733            if not readv:
     734                return # nothing to do
     735            lp = log.msg(format="%(share)s send_readv [%(span)s]",
     736                         share=repr(self), span=ask.dump(),
     737                         level=log.NOISY, parent=self._lp, umid="nByhWA")
     738            block_ev = ds.add_block_request(self._server, self._shnum,
     739                                            readv[0][0], readv[0][1], now())
     740            for (start, length) in readv:
     741                self._pending.add(start, length)
     742            d = self._rref.callRemote("readv", readv)
     743            d.addCallback(self._got_datav, readv, lp)
     744            d.addErrback(self._got_error, readv[0][0], readv[0][1], block_ev, lp)
     745            d.addCallback(self._trigger_loop)
     746            d.addErrback(lambda f:
     747                         log.err(format="unhandled error during send_request",
     748                                 failure=f, parent=self._lp,
     749                                 level=log.WEIRD, umid="qZu0wg"))
     750            return d
     751
     752        # old-style lots-of-read() form
    728753        for (start, length) in ask:
    729754            # TODO: quantize to reasonably-large blocks
    730755            self._pending.add(start, length)
    class Share: 
    747772    def _send_request(self, start, length):
    748773        return self._rref.callRemote("read", start, length)
    749774
     775    def _got_datav(self, datav, readv, lp):
     776        for i,(start,length) in enumerate(readv):
     777            data = datav[i]
     778            self._pending.remove(start, length)
     779            self._received.add(start, data)
     780            if len(data) < length:
     781                self._unavailable.add(start+len(data), length-len(data))
     782
    750783    def _got_data(self, data, start, length, block_ev, lp):
    751784        block_ev.finished(len(data), now())
    752785        if not self._alive:
  • src/allmydata/interfaces.py

    diff --git a/src/allmydata/interfaces.py b/src/allmydata/interfaces.py
    index cb45623..a1a6f30 100644
    a b WriteEnablerSecret = Hash # used to protect mutable bucket modifications 
    3030LeaseRenewSecret = Hash # used to protect bucket lease renewal requests
    3131LeaseCancelSecret = Hash # used to protect bucket lease cancellation requests
    3232
     33TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
     34# elements are (offset, length, operator, specimen)
     35# operator is one of "lt, le, eq, ne, ge, gt"
     36# nop always passes and is used to fetch data while writing.
     37# you should use length==len(specimen) for everything except nop
     38DataVector = ListOf(TupleOf(Offset, ShareData))
     39# (offset, data). This limits us to 30 writes of 1MiB each per call
     40TestAndWriteVectorsForShares = DictOf(int,
     41                                      TupleOf(TestVector,
     42                                              DataVector,
     43                                              ChoiceOf(None, Offset), # new_length
     44                                              ))
     45ReadVector = ListOf(TupleOf(Offset, ReadSize))
     46ReadData = ListOf(ShareData)
     47# returns data[offset:offset+length] for each element of TestVector
     48
    3349class RIStubClient(RemoteInterface):
    3450    """Each client publishes a service announcement for a dummy object called
    3551    the StubClient. This object doesn't actually offer any services, but the
    class RIBucketWriter(RemoteInterface): 
    5975class RIBucketReader(RemoteInterface):
    6076    def read(offset=Offset, length=ReadSize):
    6177        return ShareData
     78    def readv(readv=ReadVector):
     79        return ReadData
    6280
    6381    def advise_corrupt_share(reason=str):
    6482        """Clients who discover hash failures in shares that they have
    class RIBucketReader(RemoteInterface): 
    7290        documentation.
    7391        """
    7492
    75 TestVector = ListOf(TupleOf(Offset, ReadSize, str, str))
    76 # elements are (offset, length, operator, specimen)
    77 # operator is one of "lt, le, eq, ne, ge, gt"
    78 # nop always passes and is used to fetch data while writing.
    79 # you should use length==len(specimen) for everything except nop
    80 DataVector = ListOf(TupleOf(Offset, ShareData))
    81 # (offset, data). This limits us to 30 writes of 1MiB each per call
    82 TestAndWriteVectorsForShares = DictOf(int,
    83                                       TupleOf(TestVector,
    84                                               DataVector,
    85                                               ChoiceOf(None, Offset), # new_length
    86                                               ))
    87 ReadVector = ListOf(TupleOf(Offset, ReadSize))
    88 ReadData = ListOf(ShareData)
    89 # returns data[offset:offset+length] for each element of TestVector
    90 
    9193class RIStorageServer(RemoteInterface):
    9294    __remote_name__ = "RIStorageServer.tahoe.allmydata.com"
    9395
  • src/allmydata/storage/immutable.py

    diff --git a/src/allmydata/storage/immutable.py b/src/allmydata/storage/immutable.py
    index a50ff42..8ca6a80 100644
    a b class BucketReader(Referenceable): 
    314314        self.ss.count("read")
    315315        return data
    316316
     317    def remote_readv(self, readv):
     318        start = time.time()
     319        datav = []
     320        for (offset, length) in readv:
     321            datav.append(self._share_file.read_share_data(offset, length))
     322        self.ss.add_latency("immutable-readv", time.time() - start)
     323        self.ss.count("immutable-readv")
     324        self.ss.count("immutable-vectors", len(readv))
     325        return datav
     326
    317327    def remote_advise_corrupt_share(self, reason):
    318328        return self.ss.remote_advise_corrupt_share("immutable",
    319329                                                   self.storage_index,
  • src/allmydata/storage/server.py

    diff --git a/src/allmydata/storage/server.py b/src/allmydata/storage/server.py
    index 1f39c9c..a9b452e 100644
    a b class StorageServer(service.MultiService, Referenceable): 
    7777                          "write": [],
    7878                          "close": [],
    7979                          "read": [],
     80                          "immutable-readv": [],
    8081                          "get": [],
    8182                          "writev": [], # mutable
    8283                          "readv": [],
    class StorageServer(service.MultiService, Referenceable): 
    224225                      "delete-mutable-shares-with-zero-length-writev": True,
    225226                      "fills-holes-with-zero-bytes": True,
    226227                      "prevents-read-past-end-of-share-data": True,
     228                      "has-immutable-readv": True,
    227229                      },
    228230                    "application-version": str(allmydata.__full_version__),
    229231                    }
    class StorageServer(service.MultiService, Referenceable): 
    491493    def remote_slot_readv(self, storage_index, shares, readv):
    492494        start = time.time()
    493495        self.count("readv")
     496        self.count("mutable-vectors", len(shares)*len(readv))
    494497        si_s = si_b2a(storage_index)
    495498        lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
    496499                     facility="tahoe.storage", level=log.OPERATIONAL)
  • src/allmydata/storage_client.py

    diff --git a/src/allmydata/storage_client.py b/src/allmydata/storage_client.py
    index aa696ed..d7bb38a 100644
    a b class NativeStorageServer: 
    170170        { "maximum-immutable-share-size": 2**32,
    171171          "tolerates-immutable-read-overrun": False,
    172172          "delete-mutable-shares-with-zero-length-writev": False,
     173          "fills-holes-with-zero-bytes": False,
     174          "prevents-read-past-end-of-share-data": False,
     175          "has-immutable-readv": False,
    173176          },
    174177        "application-version": "unknown: no get_version()",
    175178        }