source: trunk/src/allmydata/test/mutable/util.py

Last change on this file was fec97256, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:37Z

trim Python2 syntax

  • Property mode set to 100644
File size: 16.3 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5
6from io import BytesIO
7import attr
8from twisted.internet import defer, reactor
9from foolscap.api import eventually, fireEventually
10from allmydata import client
11from allmydata.nodemaker import NodeMaker
12from allmydata.interfaces import SDMF_VERSION, MDMF_VERSION
13from allmydata.util import base32
14from allmydata.util.hashutil import tagged_hash
15from allmydata.storage_client import StorageFarmBroker
16from allmydata.mutable.layout import MDMFSlotReadProxy
17from allmydata.mutable.publish import MutableData
18from ..common import (
19    EMPTY_CLIENT_CONFIG,
20)
21
22def bchr(s):
23    return bytes([s])
24
25def eventuaaaaaly(res=None):
26    d = fireEventually(res)
27    d.addCallback(fireEventually)
28    d.addCallback(fireEventually)
29    return d
30
31# this "FakeStorage" exists to put the share data in RAM and avoid using real
32# network connections, both to speed up the tests and to reduce the amount of
33# non-mutable.py code being exercised.
34
35class FakeStorage:
36    # this class replaces the collection of storage servers, allowing the
37    # tests to examine and manipulate the published shares. It also lets us
38    # control the order in which read queries are answered, to exercise more
39    # of the error-handling code in Retrieve .
40    #
41    # Note that we ignore the storage index: this FakeStorage instance can
42    # only be used for a single storage index.
43
44
45    def __init__(self):
46        self._peers = {}
47        # _sequence is used to cause the responses to occur in a specific
48        # order. If it is in use, then we will defer queries instead of
49        # answering them right away, accumulating the Deferreds in a dict. We
50        # don't know exactly how many queries we'll get, so exactly one
51        # second after the first query arrives, we will release them all (in
52        # order).
53        self._sequence = None
54        self._pending = {}
55        self._pending_timer = None
56
57    def read(self, peerid, storage_index):
58        shares = self._peers.get(peerid, {})
59        if self._sequence is None:
60            return eventuaaaaaly(shares)
61        d = defer.Deferred()
62        if not self._pending:
63            self._pending_timer = reactor.callLater(1.0, self._fire_readers)
64        if peerid not in self._pending:
65            self._pending[peerid] = []
66        self._pending[peerid].append( (d, shares) )
67        return d
68
69    def _fire_readers(self):
70        self._pending_timer = None
71        pending = self._pending
72        self._pending = {}
73        for peerid in self._sequence:
74            if peerid in pending:
75                for (d, shares) in pending.pop(peerid):
76                    eventually(d.callback, shares)
77        for peerid in pending:
78            for (d, shares) in pending[peerid]:
79                eventually(d.callback, shares)
80
81    def write(self, peerid, storage_index, shnum, offset, data):
82        if peerid not in self._peers:
83            self._peers[peerid] = {}
84        shares = self._peers[peerid]
85        f = BytesIO()
86        f.write(shares.get(shnum, b""))
87        f.seek(offset)
88        f.write(data)
89        shares[shnum] = f.getvalue()
90
91
92# This doesn't actually implement the whole interface, but adding a commented
93# interface implementation annotation for grepping purposes.
94#@implementer(RIStorageServer)
95class FakeStorageServer:
96    """
97    A fake Foolscap remote object, implemented by overriding callRemote() to
98    call local methods.
99    """
100    def __init__(self, peerid, storage):
101        self.peerid = peerid
102        self.storage = storage
103        self.queries = 0
104
105    def callRemote(self, methname, *args, **kwargs):
106        self.queries += 1
107        def _call():
108            meth = getattr(self, methname)
109            return meth(*args, **kwargs)
110        d = fireEventually()
111        d.addCallback(lambda res: _call())
112        return d
113
114    def callRemoteOnly(self, methname, *args, **kwargs):
115        self.queries += 1
116        d = self.callRemote(methname, *args, **kwargs)
117        d.addBoth(lambda ignore: None)
118        pass
119
120    def advise_corrupt_share(self, share_type, storage_index, shnum, reason):
121        pass
122
123    def slot_readv(self, storage_index, shnums, readv):
124        d = self.storage.read(self.peerid, storage_index)
125        def _read(shares):
126            response = {}
127            for shnum in shares:
128                if shnums and shnum not in shnums:
129                    continue
130                vector = response[shnum] = []
131                for (offset, length) in readv:
132                    assert isinstance(offset, int), offset
133                    assert isinstance(length, int), length
134                    vector.append(shares[shnum][offset:offset+length])
135            return response
136        d.addCallback(_read)
137        return d
138
139    def slot_testv_and_readv_and_writev(self, storage_index, secrets,
140                                        tw_vectors, read_vector):
141        # always-pass: parrot the test vectors back to them.
142        readv = {}
143        for shnum, (testv, writev, new_length) in list(tw_vectors.items()):
144            for (offset, length, op, specimen) in testv:
145                assert op == b"eq"
146            # TODO: this isn't right, the read is controlled by read_vector,
147            # not by testv
148            readv[shnum] = [ specimen
149                             for (offset, length, op, specimen)
150                             in testv ]
151            for (offset, data) in writev:
152                self.storage.write(self.peerid, storage_index, shnum,
153                                   offset, data)
154        answer = (True, readv)
155        return fireEventually(answer)
156
157
158def flip_bit(original, byte_offset):
159    return (original[:byte_offset] +
160            bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x01) +
161            original[byte_offset+1:])
162
163def add_two(original, byte_offset):
164    # It isn't enough to simply flip the bit for the version number,
165    # because 1 is a valid version number. So we add two instead.
166    return (original[:byte_offset] +
167            bchr(ord(original[byte_offset:byte_offset+1]) ^ 0x02) +
168            original[byte_offset+1:])
169
170def corrupt(res, s, offset, shnums_to_corrupt=None, offset_offset=0):
171    # if shnums_to_corrupt is None, corrupt all shares. Otherwise it is a
172    # list of shnums to corrupt.
173    ds = []
174    for peerid in s._peers:
175        shares = s._peers[peerid]
176        for shnum in shares:
177            if (shnums_to_corrupt is not None
178                and shnum not in shnums_to_corrupt):
179                continue
180            data = shares[shnum]
181            # We're feeding the reader all of the share data, so it
182            # won't need to use the rref that we didn't provide, nor the
183            # storage index that we didn't provide. We do this because
184            # the reader will work for both MDMF and SDMF.
185            reader = MDMFSlotReadProxy(None, None, shnum, data)
186            # We need to get the offsets for the next part.
187            d = reader.get_verinfo()
188            def _do_corruption(verinfo, data, shnum, shares):
189                (seqnum,
190                 root_hash,
191                 IV,
192                 segsize,
193                 datalen,
194                 k, n, prefix, o) = verinfo
195                if isinstance(offset, tuple):
196                    offset1, offset2 = offset
197                else:
198                    offset1 = offset
199                    offset2 = 0
200                if offset1 == "pubkey" and IV:
201                    real_offset = 107
202                elif offset1 in o:
203                    real_offset = o[offset1]
204                else:
205                    real_offset = offset1
206                real_offset = int(real_offset) + offset2 + offset_offset
207                assert isinstance(real_offset, int), offset
208                if offset1 == 0: # verbyte
209                    f = add_two
210                else:
211                    f = flip_bit
212                shares[shnum] = f(data, real_offset)
213            d.addCallback(_do_corruption, data, shnum, shares)
214            ds.append(d)
215    dl = defer.DeferredList(ds)
216    dl.addCallback(lambda ignored: res)
217    return dl
218
219@attr.s
220class Peer:
221    peerid = attr.ib()
222    storage_server = attr.ib()
223    announcement = attr.ib()
224
225def make_peer(s, i):
226    """
227    Create a "peer" suitable for use with ``make_storagebroker_with_peers`` or
228    ``make_nodemaker_with_peers``.
229
230    :param IServer s: The server with which to associate the peers.
231
232    :param int i: A unique identifier for this peer within the whole group of
233        peers to be used.  For example, a sequence number.  This is used to
234        generate a unique peer id.
235
236    :rtype: ``Peer``
237    """
238    peerid = base32.b2a(tagged_hash(b"peerid", b"%d" % i)[:20])
239    fss = FakeStorageServer(peerid, s)
240    ann = {
241        "anonymous-storage-FURL": "pb://%s@nowhere/fake" % (str(peerid, "utf-8"),),
242        "permutation-seed-base32": peerid,
243    }
244    return Peer(peerid=peerid, storage_server=fss, announcement=ann)
245
246
247def make_storagebroker(s=None, num_peers=10):
248    """
249    Make a ``StorageFarmBroker`` connected to some number of fake storage
250    servers.
251
252    :param IServer s: The server with which to associate the fake storage
253        servers.
254
255    :param int num_peers: The number of fake storage servers to associate with
256        the broker.
257    """
258    if not s:
259        s = FakeStorage()
260    peers = []
261    for peer_num in range(num_peers):
262        peers.append(make_peer(s, peer_num))
263    return make_storagebroker_with_peers(peers)
264
265
266def make_storagebroker_with_peers(peers):
267    """
268    Make a ``StorageFarmBroker`` connected to the given storage servers.
269
270    :param list peers: The storage servers to associate with the storage
271        broker.
272    """
273    storage_broker = StorageFarmBroker(True, None, EMPTY_CLIENT_CONFIG)
274    for peer in peers:
275        storage_broker.test_add_rref(
276            peer.peerid,
277            peer.storage_server,
278            peer.announcement,
279        )
280    return storage_broker
281
282
283def make_nodemaker(s=None, num_peers=10):
284    """
285    Make a ``NodeMaker`` connected to some number of fake storage servers.
286
287    :param IServer s: The server with which to associate the fake storage
288        servers.
289
290    :param int num_peers: The number of fake storage servers to associate with
291        the node maker.
292    """
293    storage_broker = make_storagebroker(s, num_peers)
294    return make_nodemaker_with_storage_broker(storage_broker)
295
296
297def make_nodemaker_with_peers(peers):
298    """
299    Make a ``NodeMaker`` connected to the given storage servers.
300
301    :param list peers: The storage servers to associate with the node maker.
302    """
303    storage_broker = make_storagebroker_with_peers(peers)
304    return make_nodemaker_with_storage_broker(storage_broker)
305
306
307def make_nodemaker_with_storage_broker(storage_broker):
308    """
309    Make a ``NodeMaker`` using the given storage broker.
310
311    :param StorageFarmBroker peers: The storage broker to use.
312    """
313    sh = client.SecretHolder(b"lease secret", b"convergence secret")
314    keygen = client.KeyGenerator()
315    nodemaker = NodeMaker(storage_broker, sh, None,
316                          None, None,
317                          {"k": 3, "n": 10}, SDMF_VERSION, keygen)
318    return nodemaker
319
320
321class PublishMixin:
322    def publish_one(self):
323        # publish a file and create shares, which can then be manipulated
324        # later.
325        self.CONTENTS = b"New contents go here" * 1000
326        self.uploadable = MutableData(self.CONTENTS)
327        self._storage = FakeStorage()
328        self._nodemaker = make_nodemaker(self._storage)
329        self._storage_broker = self._nodemaker.storage_broker
330        d = self._nodemaker.create_mutable_file(self.uploadable)
331        def _created(node):
332            self._fn = node
333            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
334        d.addCallback(_created)
335        return d
336
337    def publish_mdmf(self, data=None):
338        # like publish_one, except that the result is guaranteed to be
339        # an MDMF file.
340        # self.CONTENTS should have more than one segment.
341        if data is None:
342            data = b"This is an MDMF file" * 100000
343        self.CONTENTS = data
344        self.uploadable = MutableData(self.CONTENTS)
345        self._storage = FakeStorage()
346        self._nodemaker = make_nodemaker(self._storage)
347        self._storage_broker = self._nodemaker.storage_broker
348        d = self._nodemaker.create_mutable_file(self.uploadable, version=MDMF_VERSION)
349        def _created(node):
350            self._fn = node
351            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
352        d.addCallback(_created)
353        return d
354
355
356    def publish_sdmf(self, data=None):
357        # like publish_one, except that the result is guaranteed to be
358        # an SDMF file
359        if data is None:
360            data = b"This is an SDMF file" * 1000
361        self.CONTENTS = data
362        self.uploadable = MutableData(self.CONTENTS)
363        self._storage = FakeStorage()
364        self._nodemaker = make_nodemaker(self._storage)
365        self._storage_broker = self._nodemaker.storage_broker
366        d = self._nodemaker.create_mutable_file(self.uploadable, version=SDMF_VERSION)
367        def _created(node):
368            self._fn = node
369            self._fn2 = self._nodemaker.create_from_cap(node.get_uri())
370        d.addCallback(_created)
371        return d
372
373
374    def publish_multiple(self, version=0):
375        self.CONTENTS = [b"Contents 0",
376                         b"Contents 1",
377                         b"Contents 2",
378                         b"Contents 3a",
379                         b"Contents 3b"]
380        self.uploadables = [MutableData(d) for d in self.CONTENTS]
381        self._copied_shares = {}
382        self._storage = FakeStorage()
383        self._nodemaker = make_nodemaker(self._storage)
384        d = self._nodemaker.create_mutable_file(self.uploadables[0], version=version) # seqnum=1
385        def _created(node):
386            self._fn = node
387            # now create multiple versions of the same file, and accumulate
388            # their shares, so we can mix and match them later.
389            d = defer.succeed(None)
390            d.addCallback(self._copy_shares, 0)
391            d.addCallback(lambda res: node.overwrite(self.uploadables[1])) #s2
392            d.addCallback(self._copy_shares, 1)
393            d.addCallback(lambda res: node.overwrite(self.uploadables[2])) #s3
394            d.addCallback(self._copy_shares, 2)
395            d.addCallback(lambda res: node.overwrite(self.uploadables[3])) #s4a
396            d.addCallback(self._copy_shares, 3)
397            # now we replace all the shares with version s3, and upload a new
398            # version to get s4b.
399            rollback = dict([(i,2) for i in range(10)])
400            d.addCallback(lambda res: self._set_versions(rollback))
401            d.addCallback(lambda res: node.overwrite(self.uploadables[4])) #s4b
402            d.addCallback(self._copy_shares, 4)
403            # we leave the storage in state 4
404            return d
405        d.addCallback(_created)
406        return d
407
408
409    def _copy_shares(self, ignored, index):
410        shares = self._storage._peers
411        # we need a deep copy
412        new_shares = {}
413        for peerid in shares:
414            new_shares[peerid] = {}
415            for shnum in shares[peerid]:
416                new_shares[peerid][shnum] = shares[peerid][shnum]
417        self._copied_shares[index] = new_shares
418
419    def _set_versions(self, versionmap):
420        # versionmap maps shnums to which version (0,1,2,3,4) we want the
421        # share to be at. Any shnum which is left out of the map will stay at
422        # its current version.
423        shares = self._storage._peers
424        oldshares = self._copied_shares
425        for peerid in shares:
426            for shnum in shares[peerid]:
427                if shnum in versionmap:
428                    index = versionmap[shnum]
429                    shares[peerid][shnum] = oldshares[index][peerid][shnum]
430
431class CheckerMixin:
432    def check_good(self, r, where):
433        self.failUnless(r.is_healthy(), where)
434        return r
435
436    def check_bad(self, r, where):
437        self.failIf(r.is_healthy(), where)
438        return r
439
440    def check_expected_failure(self, r, expected_exception, substring, where):
441        for (peerid, storage_index, shnum, f) in r.get_share_problems():
442            if f.check(expected_exception):
443                self.failUnless(substring in str(f),
444                                "%s: substring '%s' not in '%s'" %
445                                (where, substring, str(f)))
446                return
447        self.fail("%s: didn't see expected exception %s in problems %s" %
448                  (where, expected_exception, r.get_share_problems()))
Note: See TracBrowser for help on using the repository browser.