source: trunk/src/allmydata/test/mutable/test_version.py

Last change on this file was bf5213cb, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-06-14T22:01:52Z

Pacify mypy

  • Property mode set to 100644
File size: 17.2 KB
Line 
1"""
2Tests related to the way ``allmydata.mutable`` handles different versions
3of data for an object.
4"""
5
6from io import StringIO
7import os
8from typing import Optional
9
10from ..common import AsyncTestCase
11from testtools.matchers import (
12    Equals,
13    IsInstance,
14    HasLength,
15    Contains,
16)
17
18from allmydata import uri
19from allmydata.interfaces import SDMF_VERSION, MDMF_VERSION
20from allmydata.util import base32, consumer, mathutil
21from allmydata.util.fileutil import abspath_expanduser_unicode
22from allmydata.util.deferredutil import gatherResults
23from allmydata.mutable.filenode import MutableFileNode
24from allmydata.mutable.common import MODE_WRITE, MODE_READ, UnrecoverableFileError
25from allmydata.mutable.publish import MutableData
26from allmydata.scripts import debug
27from ..no_network import GridTestMixin
28from .util import PublishMixin
29from .. import common_util as testutil
30
31class Version(GridTestMixin, AsyncTestCase, testutil.ShouldFailMixin, \
32              PublishMixin):
33    def setUp(self):
34        GridTestMixin.setUp(self)
35        self.basedir = self.mktemp()
36        self.set_up_grid()
37        self.c = self.g.clients[0]
38        self.nm = self.c.nodemaker
39        self.data = b"test data" * 100000 # about 900 KiB; MDMF
40        self.small_data = b"test data" * 10 # 90 B; SDMF
41
42
43    async def do_upload_mdmf(self, data: Optional[bytes] = None) -> MutableFileNode:
44        if data is None:
45            data = self.data
46        n = await self.nm.create_mutable_file(MutableData(data),
47                                              version=MDMF_VERSION)
48        self.assertThat(n, IsInstance(MutableFileNode))
49        self.assertThat(n._protocol_version, Equals(MDMF_VERSION))
50        self.mdmf_node = n
51        return n
52
53    async def do_upload_sdmf(self, data: Optional[bytes] = None) -> MutableFileNode:
54        if data is None:
55            data = self.small_data
56        n = await self.nm.create_mutable_file(MutableData(data))
57        self.assertThat(n, IsInstance(MutableFileNode))
58        self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
59        self.sdmf_node = n
60        return n
61
62    async def do_upload_empty_sdmf(self) -> MutableFileNode:
63        n = await self.nm.create_mutable_file(MutableData(b""))
64        self.assertThat(n, IsInstance(MutableFileNode))
65        self.sdmf_zero_length_node = n
66        self.assertThat(n._protocol_version, Equals(SDMF_VERSION))
67        return n
68
69    async def do_upload(self) -> MutableFileNode:
70        await self.do_upload_mdmf()
71        return await self.do_upload_sdmf()
72
73    async def test_debug(self) -> None:
74        n = await self.do_upload_mdmf()
75        fso = debug.FindSharesOptions()
76        storage_index = base32.b2a(n.get_storage_index())
77        fso.si_s = str(storage_index, "utf-8")  # command-line options are unicode on Python 3
78        fso.nodedirs = [os.path.dirname(abspath_expanduser_unicode(str(storedir)))
79                        for (i,ss,storedir)
80                        in self.iterate_servers()]
81        # This attribute isn't defined on FindSharesOptions but `find_shares()`
82        # definitely expects it...
83        fso.stdout = StringIO()  # type: ignore[attr-defined]
84        debug.find_shares(fso)
85        sharefiles = fso.stdout.getvalue().splitlines()  # type: ignore[attr-defined]
86        expected = self.nm.default_encoding_parameters["n"]
87        self.assertThat(sharefiles, HasLength(expected))
88
89        # This attribute isn't defined on DebugOptions but `dump_share()`
90        # definitely expects it...
91        do = debug.DumpOptions()
92        do["filename"] = sharefiles[0]
93        do.stdout = StringIO()  # type: ignore[attr-defined]
94        debug.dump_share(do)
95        output = do.stdout.getvalue()  # type: ignore[attr-defined]
96        lines = set(output.splitlines())
97        self.assertTrue("Mutable slot found:" in lines, output)
98        self.assertTrue(" share_type: MDMF" in lines, output)
99        self.assertTrue(" num_extra_leases: 0" in lines, output)
100        self.assertTrue(" MDMF contents:" in lines, output)
101        self.assertTrue("  seqnum: 1" in lines, output)
102        self.assertTrue("  required_shares: 3" in lines, output)
103        self.assertTrue("  total_shares: 10" in lines, output)
104        self.assertTrue("  segsize: 131073" in lines, output)
105        self.assertTrue("  datalen: %d" % len(self.data) in lines, output)
106        vcap = str(n.get_verify_cap().to_string(), "utf-8")
107        self.assertTrue("  verify-cap: %s" % vcap in lines, output)
108        cso = debug.CatalogSharesOptions()
109        cso.nodedirs = fso.nodedirs
110        # Definitely not options on CatalogSharesOptions, but the code does use
111        # stdout and stderr...
112        cso.stdout = StringIO()  # type: ignore[attr-defined]
113        cso.stderr = StringIO()  # type: ignore[attr-defined]
114        debug.catalog_shares(cso)
115        shares = cso.stdout.getvalue().splitlines()  # type: ignore[attr-defined]
116        oneshare = shares[0] # all shares should be MDMF
117        self.failIf(oneshare.startswith("UNKNOWN"), oneshare)
118        self.assertTrue(oneshare.startswith("MDMF"), oneshare)
119        fields = oneshare.split()
120        self.assertThat(fields[0], Equals("MDMF"))
121        self.assertThat(fields[1].encode("ascii"), Equals(storage_index))
122        self.assertThat(fields[2], Equals("3/10"))
123        self.assertThat(fields[3], Equals("%d" % len(self.data)))
124        self.assertTrue(fields[4].startswith("#1:"), fields[3])
125        # the rest of fields[4] is the roothash, which depends upon
126        # encryption salts and is not constant. fields[5] is the
127        # remaining time on the longest lease, which is timing dependent.
128        # The rest of the line is the quoted pathname to the share.
129
130    async def test_get_sequence_number(self) -> None:
131        await self.do_upload()
132        bv = await self.mdmf_node.get_best_readable_version()
133        self.assertThat(bv.get_sequence_number(), Equals(1))
134        bv = await self.sdmf_node.get_best_readable_version()
135        self.assertThat(bv.get_sequence_number(), Equals(1))
136
137        # Now update. The sequence number in both cases should be 1 in
138        # both cases.
139        new_data = MutableData(b"foo bar baz" * 100000)
140        new_small_data = MutableData(b"foo bar baz" * 10)
141        d1 = self.mdmf_node.overwrite(new_data)
142        d2 = self.sdmf_node.overwrite(new_small_data)
143        await gatherResults([d1, d2])
144        bv = await self.mdmf_node.get_best_readable_version()
145        self.assertThat(bv.get_sequence_number(), Equals(2))
146        bv = await self.sdmf_node.get_best_readable_version()
147        self.assertThat(bv.get_sequence_number(), Equals(2))
148
149    async def test_cap_after_upload(self) -> None:
150        # If we create a new mutable file and upload things to it, and
151        # it's an MDMF file, we should get an MDMF cap back from that
152        # file and should be able to use that.
153        # That's essentially what MDMF node is, so just check that.
154        await self.do_upload_mdmf()
155        mdmf_uri = self.mdmf_node.get_uri()
156        cap = uri.from_string(mdmf_uri)
157        self.assertTrue(isinstance(cap, uri.WriteableMDMFFileURI))
158        readonly_mdmf_uri = self.mdmf_node.get_readonly_uri()
159        cap = uri.from_string(readonly_mdmf_uri)
160        self.assertTrue(isinstance(cap, uri.ReadonlyMDMFFileURI))
161
162    async def test_mutable_version(self) -> None:
163        # assert that getting parameters from the IMutableVersion object
164        # gives us the same data as getting them from the filenode itself
165        await self.do_upload()
166        bv = await self.mdmf_node.get_best_mutable_version()
167        n = self.mdmf_node
168        self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
169        self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
170        self.assertFalse(bv.is_readonly())
171
172        bv = await self.sdmf_node.get_best_mutable_version()
173        n = self.sdmf_node
174        self.assertThat(bv.get_writekey(), Equals(n.get_writekey()))
175        self.assertThat(bv.get_storage_index(), Equals(n.get_storage_index()))
176        self.assertFalse(bv.is_readonly())
177
178
179    async def test_get_readonly_version(self) -> None:
180        await self.do_upload()
181        bv = await self.mdmf_node.get_best_readable_version()
182        self.assertTrue(bv.is_readonly())
183
184        # Attempting to get a mutable version of a mutable file from a
185        # filenode initialized with a readcap should return a readonly
186        # version of that same node.
187        ro = self.mdmf_node.get_readonly()
188        v = await ro.get_best_mutable_version()
189        self.assertTrue(v.is_readonly())
190
191        bv = await self.sdmf_node.get_best_readable_version()
192        self.assertTrue(bv.is_readonly())
193
194        ro = self.sdmf_node.get_readonly()
195        v = await ro.get_best_mutable_version()
196        self.assertTrue(v.is_readonly())
197
198
199    async def test_toplevel_overwrite(self) -> None:
200        new_data = MutableData(b"foo bar baz" * 100000)
201        new_small_data = MutableData(b"foo bar baz" * 10)
202        await self.do_upload()
203        await self.mdmf_node.overwrite(new_data)
204        data = await self.mdmf_node.download_best_version()
205        self.assertThat(data, Equals(b"foo bar baz" * 100000))
206        await self.sdmf_node.overwrite(new_small_data)
207        data = await self.sdmf_node.download_best_version()
208        self.assertThat(data, Equals(b"foo bar baz" * 10))
209
210
211    async def test_toplevel_modify(self) -> None:
212        await self.do_upload()
213        def modifier(old_contents, servermap, first_time):
214            return old_contents + b"modified"
215        await self.mdmf_node.modify(modifier)
216        data = await self.mdmf_node.download_best_version()
217        self.assertThat(data, Contains(b"modified"))
218        await self.sdmf_node.modify(modifier)
219        data = await self.sdmf_node.download_best_version()
220        self.assertThat(data, Contains(b"modified"))
221
222
223    async def test_version_modify(self) -> None:
224        # TODO: When we can publish multiple versions, alter this test
225        # to modify a version other than the best usable version, then
226        # test to see that the best recoverable version is that.
227        await self.do_upload()
228        def modifier(old_contents, servermap, first_time):
229            return old_contents + b"modified"
230        await self.mdmf_node.modify(modifier)
231        data = await self.mdmf_node.download_best_version()
232        self.assertThat(data, Contains(b"modified"))
233        await self.sdmf_node.modify(modifier)
234        data = await self.sdmf_node.download_best_version()
235        self.assertThat(data, Contains(b"modified"))
236
237
238    async def test_download_version(self) -> None:
239        await self.publish_multiple()
240        # We want to have two recoverable versions on the grid.
241        self._set_versions({0:0,2:0,4:0,6:0,8:0,
242                            1:1,3:1,5:1,7:1,9:1})
243        # Now try to download each version. We should get the plaintext
244        # associated with that version.
245        smap = await self._fn.get_servermap(mode=MODE_READ)
246        versions = smap.recoverable_versions()
247        assert len(versions) == 2
248
249        self.servermap = smap
250        self.version1, self.version2 = versions
251        assert self.version1 != self.version2
252
253        self.version1_seqnum = self.version1[0]
254        self.version2_seqnum = self.version2[0]
255        self.version1_index = self.version1_seqnum - 1
256        self.version2_index = self.version2_seqnum - 1
257
258        results = await self._fn.download_version(self.servermap, self.version1)
259        self.assertThat(self.CONTENTS[self.version1_index],
260                        Equals(results))
261        results = await self._fn.download_version(self.servermap, self.version2)
262        self.assertThat(self.CONTENTS[self.version2_index],
263                        Equals(results))
264
265
266    async def test_download_nonexistent_version(self) -> None:
267        await self.do_upload_mdmf()
268        servermap = await self.mdmf_node.get_servermap(mode=MODE_WRITE)
269        await self.shouldFail(UnrecoverableFileError, "nonexistent version",
270                              None,
271                              self.mdmf_node.download_version, servermap,
272                              "not a version")
273
274
275    async def _test_partial_read(self, node, expected, modes, step) -> None:
276        version = await node.get_best_readable_version()
277        for (name, offset, length) in modes:
278            await self._do_partial_read(version, name, expected, offset, length)
279        # then read the whole thing, but only a few bytes at a time, and see
280        # that the results are what we expect.
281        c = consumer.MemoryConsumer()
282        for i in range(0, len(expected), step):
283            await version.read(c, i, step)
284        self.assertThat(expected, Equals(b"".join(c.chunks)))
285
286    async def _do_partial_read(self, version, name, expected, offset, length) -> None:
287        c = consumer.MemoryConsumer()
288        await version.read(c, offset, length)
289        if length is None:
290            expected_range = expected[offset:]
291        else:
292            expected_range = expected[offset:offset+length]
293        results = b"".join(c.chunks)
294        if results != expected_range:
295            print("read([%d]+%s) got %d bytes, not %d" % \
296                  (offset, length, len(results), len(expected_range)))
297            print("got: %r ... %r" % (results[:20], results[-20:]))
298            print("exp: %r ... %r" % (expected_range[:20], expected_range[-20:]))
299            self.fail("results[%s] != expected_range" % name)
300
301    async def test_partial_read_mdmf_0(self) -> None:
302        data = b""
303        result = await self.do_upload_mdmf(data=data)
304        modes = [("all1",    0,0),
305                 ("all2",    0,None),
306                 ]
307        await self._test_partial_read(result, data, modes, 1)
308
309    async def test_partial_read_mdmf_large(self) -> None:
310        segment_boundary = mathutil.next_multiple(128 * 1024, 3)
311        modes = [("start_on_segment_boundary",              segment_boundary, 50),
312                 ("ending_one_byte_after_segment_boundary", segment_boundary-50, 51),
313                 ("zero_length_at_start",                   0, 0),
314                 ("zero_length_in_middle",                  50, 0),
315                 ("zero_length_at_segment_boundary",        segment_boundary, 0),
316                 ("complete_file1",                         0, len(self.data)),
317                 ("complete_file2",                         0, None),
318                 ]
319        result = await self.do_upload_mdmf()
320        await self._test_partial_read(result, self.data, modes, 10000)
321
322    async def test_partial_read_sdmf_0(self) -> None:
323        data = b""
324        modes = [("all1",    0,0),
325                 ("all2",    0,None),
326                 ]
327        result = await self.do_upload_sdmf(data=data)
328        await self._test_partial_read(result, data, modes, 1)
329
330    async def test_partial_read_sdmf_2(self) -> None:
331        data = b"hi"
332        modes = [("one_byte",                  0, 1),
333                 ("last_byte",                 1, 1),
334                 ("last_byte2",                1, None),
335                 ("complete_file",             0, 2),
336                 ("complete_file2",            0, None),
337                 ]
338        result = await self.do_upload_sdmf(data=data)
339        await self._test_partial_read(result, data, modes, 1)
340
341    async def test_partial_read_sdmf_90(self) -> None:
342        modes = [("start_at_middle",           50, 40),
343                 ("start_at_middle2",          50, None),
344                 ("zero_length_at_start",      0, 0),
345                 ("zero_length_in_middle",     50, 0),
346                 ("zero_length_at_end",        90, 0),
347                 ("complete_file1",            0, None),
348                 ("complete_file2",            0, 90),
349                 ]
350        result = await self.do_upload_sdmf()
351        await self._test_partial_read(result, self.small_data, modes, 10)
352
353    async def test_partial_read_sdmf_100(self) -> None:
354        data = b"test data "*10
355        modes = [("start_at_middle",           50, 50),
356                 ("start_at_middle2",          50, None),
357                 ("zero_length_at_start",      0, 0),
358                 ("zero_length_in_middle",     50, 0),
359                 ("complete_file1",            0, 100),
360                 ("complete_file2",            0, None),
361                 ]
362        result = await self.do_upload_sdmf(data=data)
363        await self._test_partial_read(result, data, modes, 10)
364
365    async def _test_read_and_download(self, node, expected) -> None:
366        version = await node.get_best_readable_version()
367        c = consumer.MemoryConsumer()
368        await version.read(c)
369        self.assertThat(expected, Equals(b"".join(c.chunks)))
370
371        c2 = consumer.MemoryConsumer()
372        await version.read(c2, offset=0, size=len(expected))
373        self.assertThat(expected, Equals(b"".join(c2.chunks)))
374
375        data = await node.download_best_version()
376        self.assertThat(expected, Equals(data))
377
378    async def test_read_and_download_mdmf(self) -> None:
379        result = await self.do_upload_mdmf()
380        await self._test_read_and_download(result, self.data)
381
382    async def test_read_and_download_sdmf(self) -> None:
383        result = await self.do_upload_sdmf()
384        await self._test_read_and_download(result, self.small_data)
385
386    async def test_read_and_download_sdmf_zero_length(self) -> None:
387        result = await self.do_upload_empty_sdmf()
388        await self._test_read_and_download(result, b"")
Note: See TracBrowser for help on using the repository browser.