source: trunk/src/allmydata/test/test_system.py

Last change on this file was 20d2c694, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2025-01-06T21:51:21Z

trim "six" usage

  • Property mode set to 100644
File size: 83.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6import os, re, sys, time, json
7from typing import Optional
8
9from bs4 import BeautifulSoup
10
11from twisted.trial import unittest
12from twisted.internet import defer
13
14from allmydata import uri
15from allmydata.storage.mutable import MutableShareFile
16from allmydata.storage.immutable import ShareFile
17from allmydata.storage.server import si_a2b
18from allmydata.immutable import offloaded, upload
19from allmydata.immutable.literal import LiteralFileNode
20from allmydata.immutable.filenode import ImmutableFileNode
21from allmydata.util import idlib, mathutil
22from allmydata.util import log, base32
23from allmydata.util.encodingutil import quote_output, unicode_to_argv
24from allmydata.util.fileutil import abspath_expanduser_unicode
25from allmydata.util.consumer import MemoryConsumer, download_to_data
26from allmydata.util.deferredutil import async_to_deferred
27from allmydata.interfaces import IDirectoryNode, IFileNode, \
28     NoSuchChildError, NoSharesError, SDMF_VERSION, MDMF_VERSION
29from allmydata.monitor import Monitor
30from allmydata.mutable.common import NotWriteableError
31from allmydata.mutable import layout as mutable_layout
32from allmydata.mutable.publish import MutableData
33
34from foolscap.api import DeadReferenceError, fireEventually
35from twisted.python.failure import Failure
36from twisted.internet.utils import (
37    getProcessOutputAndValue,
38)
39
40from .common_web import do_http as do_http_bytes, Error
41from .web.common import (
42    assert_soup_has_tag_with_attributes
43)
44from .common_system import SystemTestMixin
45from .common_util import run_cli_unicode
46
47def byteschr(x):
48    return bytes([x])
49
50class RunBinTahoeMixin:
51    def run_bintahoe(self, args, stdin=None, python_options:Optional[list[str]]=None, env=None):
52        # test_runner.run_bintahoe has better unicode support but doesn't
53        # support env yet and is also synchronous.  If we could get rid of
54        # this in favor of that, though, it would probably be an improvement.
55        if python_options is None:
56            python_options = []
57        command = sys.executable
58        argv = python_options + ["-b", "-m", "allmydata.scripts.runner"] + args
59
60        if env is None:
61            env = os.environ
62
63        d = getProcessOutputAndValue(command, argv, env, stdinBytes=stdin)
64        def fix_signal(result):
65            # Mirror subprocess.Popen.returncode structure
66            (out, err, signal) = result
67            return (out, err, -signal)
68        d.addErrback(fix_signal)
69        return d
70
71
72def run_cli(*args, **kwargs):
73    """
74    Run a Tahoe-LAFS CLI utility, but inline.
75
76    Version of run_cli_unicode() that takes any kind of string, and the
77    command-line args inline instead of as verb + list.
78
79    Backwards compatible version so we don't have to change all the tests that
80    expected this API.
81    """
82    nodeargs = [a for a in kwargs.pop("nodeargs", [])]
83    kwargs["nodeargs"] = nodeargs
84    return run_cli_unicode(args[0], [a for a in args[1:]], **kwargs)
85
86
87def do_http(*args, **kwargs):
88    """Wrapper for do_http() that returns Unicode."""
89    return do_http_bytes(*args, **kwargs).addCallback(
90        lambda b: str(b, "utf-8"))
91
92
93LARGE_DATA = b"""
94This is some data to publish to the remote grid.., which needs to be large
95enough to not fit inside a LIT uri.
96"""
97
98
99class CountingDataUploadable(upload.Data):
100    bytes_read = 0
101    interrupt_after = None
102    interrupt_after_d = None
103
104    def read(self, length):
105        self.bytes_read += length
106        if self.interrupt_after is not None:
107            if self.bytes_read > self.interrupt_after:
108                self.interrupt_after = None
109                self.interrupt_after_d.callback(self)
110        return upload.Data.read(self, length)
111
112
113class SystemTest(SystemTestMixin, RunBinTahoeMixin, unittest.TestCase):
114    """Foolscap integration-y tests."""
115    FORCE_FOOLSCAP_FOR_STORAGE = True
116    timeout = 300
117
118    @property
119    def basedir(self):
120        return "system/SystemTest/{}-foolscap-{}".format(
121            self.id().split(".")[-1], self.FORCE_FOOLSCAP_FOR_STORAGE
122        )
123
124    def test_connections(self):
125        d = self.set_up_nodes()
126        self.extra_node = None
127        d.addCallback(lambda res: self.add_extra_node(self.numclients))
128        def _check(extra_node):
129            self.extra_node = extra_node
130            for c in self.clients:
131                all_peerids = c.get_storage_broker().get_all_serverids()
132                self.failUnlessEqual(len(all_peerids), self.numclients+1)
133                sb = c.storage_broker
134                permuted_peers = sb.get_servers_for_psi("a")
135                self.failUnlessEqual(len(permuted_peers), self.numclients+1)
136
137        d.addCallback(_check)
138        def _shutdown_extra_node(res):
139            if self.extra_node:
140                return self.extra_node.stopService()
141            return res
142        d.addBoth(_shutdown_extra_node)
143        return d
144    # test_connections is subsumed by test_upload_and_download, and takes
145    # quite a while to run on a slow machine (because of all the TLS
146    # connections that must be established). If we ever rework the introducer
147    # code to such an extent that we're not sure if it works anymore, we can
148    # reinstate this test until it does.
149    del test_connections
150
151    def test_upload_and_download_random_key(self):
152        return self._test_upload_and_download(convergence=None)
153
154    def test_upload_and_download_convergent(self):
155        return self._test_upload_and_download(convergence=b"some convergence string")
156
157    def _test_upload_and_download(self, convergence):
158        # we use 4000 bytes of data, which will result in about 400k written
159        # to disk among all our simulated nodes
160        DATA = b"Some data to upload\n" * 200
161        d = self.set_up_nodes()
162        def _check_connections(res):
163            for c in self.clients:
164                c.encoding_params['happy'] = 5
165                all_peerids = c.get_storage_broker().get_all_serverids()
166                self.failUnlessEqual(len(all_peerids), self.numclients)
167                sb = c.storage_broker
168                permuted_peers = sb.get_servers_for_psi(b"a")
169                self.failUnlessEqual(len(permuted_peers), self.numclients)
170        d.addCallback(_check_connections)
171
172        def _do_upload(res):
173            log.msg("UPLOADING")
174            u = self.clients[0].getServiceNamed("uploader")
175            self.uploader = u
176            # we crank the max segsize down to 1024b for the duration of this
177            # test, so we can exercise multiple segments. It is important
178            # that this is not a multiple of the segment size, so that the
179            # tail segment is not the same length as the others. This actualy
180            # gets rounded up to 1025 to be a multiple of the number of
181            # required shares (since we use 25 out of 100 FEC).
182            up = upload.Data(DATA, convergence=convergence)
183            up.max_segment_size = 1024
184            d1 = u.upload(up)
185            return d1
186        d.addCallback(_do_upload)
187        def _upload_done(results):
188            theuri = results.get_uri()
189            log.msg("upload finished: uri is %r" % (theuri,))
190            self.uri = theuri
191            assert isinstance(self.uri, bytes), self.uri
192            self.cap = uri.from_string(self.uri)
193            self.n = self.clients[1].create_node_from_uri(self.uri)
194        d.addCallback(_upload_done)
195
196        def _upload_again(res):
197            # Upload again. If using convergent encryption then this ought to be
198            # short-circuited, however with the way we currently generate URIs
199            # (i.e. because they include the roothash), we have to do all of the
200            # encoding work, and only get to save on the upload part.
201            log.msg("UPLOADING AGAIN")
202            up = upload.Data(DATA, convergence=convergence)
203            up.max_segment_size = 1024
204            return self.uploader.upload(up)
205        d.addCallback(_upload_again)
206
207        def _download_to_data(res):
208            log.msg("DOWNLOADING")
209            return download_to_data(self.n)
210        d.addCallback(_download_to_data)
211        def _download_to_data_done(data):
212            log.msg("download finished")
213            self.failUnlessEqual(data, DATA)
214        d.addCallback(_download_to_data_done)
215
216        def _test_read(res):
217            n = self.clients[1].create_node_from_uri(self.uri)
218            d = download_to_data(n)
219            def _read_done(data):
220                self.failUnlessEqual(data, DATA)
221            d.addCallback(_read_done)
222            d.addCallback(lambda ign:
223                          n.read(MemoryConsumer(), offset=1, size=4))
224            def _read_portion_done(mc):
225                self.failUnlessEqual(b"".join(mc.chunks), DATA[1:1+4])
226            d.addCallback(_read_portion_done)
227            d.addCallback(lambda ign:
228                          n.read(MemoryConsumer(), offset=2, size=None))
229            def _read_tail_done(mc):
230                self.failUnlessEqual(b"".join(mc.chunks), DATA[2:])
231            d.addCallback(_read_tail_done)
232            d.addCallback(lambda ign:
233                          n.read(MemoryConsumer(), size=len(DATA)+1000))
234            def _read_too_much(mc):
235                self.failUnlessEqual(b"".join(mc.chunks), DATA)
236            d.addCallback(_read_too_much)
237
238            return d
239        d.addCallback(_test_read)
240
241        def _test_bad_read(res):
242            bad_u = uri.from_string_filenode(self.uri)
243            bad_u.key = self.flip_bit(bad_u.key)
244            bad_n = self.clients[1].create_node_from_uri(bad_u.to_string())
245            # this should cause an error during download
246
247            d = self.shouldFail2(NoSharesError, "'download bad node'",
248                                 None,
249                                 bad_n.read, MemoryConsumer(), offset=2)
250            return d
251        d.addCallback(_test_bad_read)
252
253        def _download_nonexistent_uri(res):
254            baduri = self.mangle_uri(self.uri)
255            badnode = self.clients[1].create_node_from_uri(baduri)
256            log.msg("about to download non-existent URI", level=log.UNUSUAL,
257                    facility="tahoe.tests")
258            d1 = download_to_data(badnode)
259            def _baduri_should_fail(res):
260                log.msg("finished downloading non-existent URI",
261                        level=log.UNUSUAL, facility="tahoe.tests")
262                self.failUnless(isinstance(res, Failure))
263                self.failUnless(res.check(NoSharesError),
264                                "expected NoSharesError, got %s" % res)
265            d1.addBoth(_baduri_should_fail)
266            return d1
267        d.addCallback(_download_nonexistent_uri)
268
269        # add a new node, which doesn't accept shares, and only uses the
270        # helper for upload.
271        d.addCallback(lambda res: self.add_extra_node(self.numclients,
272                                                      self.helper_furl,
273                                                      add_to_sparent=True))
274        def _added(extra_node):
275            self.extra_node = extra_node
276            self.extra_node.encoding_params['happy'] = 5
277        d.addCallback(_added)
278
279        def _has_helper():
280            uploader = self.extra_node.getServiceNamed("uploader")
281            furl, connected = uploader.get_helper_info()
282            return connected
283        d.addCallback(lambda ign: self.poll(_has_helper))
284
285        HELPER_DATA = b"Data that needs help to upload" * 1000
286        def _upload_with_helper(res):
287            u = upload.Data(HELPER_DATA, convergence=convergence)
288            d = self.extra_node.upload(u)
289            def _uploaded(results):
290                n = self.clients[1].create_node_from_uri(results.get_uri())
291                return download_to_data(n)
292            d.addCallback(_uploaded)
293            def _check(newdata):
294                self.failUnlessEqual(newdata, HELPER_DATA)
295            d.addCallback(_check)
296            return d
297        d.addCallback(_upload_with_helper)
298
299        def _upload_duplicate_with_helper(res):
300            u = upload.Data(HELPER_DATA, convergence=convergence)
301            u.debug_stash_RemoteEncryptedUploadable = True
302            d = self.extra_node.upload(u)
303            def _uploaded(results):
304                n = self.clients[1].create_node_from_uri(results.get_uri())
305                return download_to_data(n)
306            d.addCallback(_uploaded)
307            def _check(newdata):
308                self.failUnlessEqual(newdata, HELPER_DATA)
309                self.failIf(hasattr(u, "debug_RemoteEncryptedUploadable"),
310                            "uploadable started uploading, should have been avoided")
311            d.addCallback(_check)
312            return d
313        if convergence is not None:
314            d.addCallback(_upload_duplicate_with_helper)
315
316        d.addCallback(fireEventually)
317
318        def _upload_resumable(res):
319            DATA = b"Data that needs help to upload and gets interrupted" * 1000
320            u1 = CountingDataUploadable(DATA, convergence=convergence)
321            u2 = CountingDataUploadable(DATA, convergence=convergence)
322
323            # we interrupt the connection after about 5kB by shutting down
324            # the helper, then restarting it.
325            u1.interrupt_after = 5000
326            u1.interrupt_after_d = defer.Deferred()
327            bounced_d = defer.Deferred()
328            def _do_bounce(res):
329                d = self.bounce_client(0)
330                d.addBoth(bounced_d.callback)
331            u1.interrupt_after_d.addCallback(_do_bounce)
332
333            # sneak into the helper and reduce its chunk size, so that our
334            # debug_interrupt will sever the connection on about the fifth
335            # chunk fetched. This makes sure that we've started to write the
336            # new shares before we abandon them, which exercises the
337            # abort/delete-partial-share code. TODO: find a cleaner way to do
338            # this. I know that this will affect later uses of the helper in
339            # this same test run, but I'm not currently worried about it.
340            offloaded.CHKCiphertextFetcher.CHUNK_SIZE = 1000
341
342            upload_d = self.extra_node.upload(u1)
343            # The upload will start, and bounce_client() will be called after
344            # about 5kB. bounced_d will fire after bounce_client() finishes
345            # shutting down and restarting the node.
346            d = bounced_d
347            def _bounced(ign):
348                # By this point, the upload should have failed because of the
349                # interruption. upload_d will fire in a moment
350                def _should_not_finish(res):
351                    self.fail("interrupted upload should have failed, not"
352                              " finished with result %s" % (res,))
353                def _interrupted(f):
354                    f.trap(DeadReferenceError)
355                    # make sure we actually interrupted it before finishing
356                    # the file
357                    self.failUnless(u1.bytes_read < len(DATA),
358                                    "read %d out of %d total" %
359                                    (u1.bytes_read, len(DATA)))
360                upload_d.addCallbacks(_should_not_finish, _interrupted)
361                return upload_d
362            d.addCallback(_bounced)
363
364            def _disconnected(res):
365                # check to make sure the storage servers aren't still hanging
366                # on to the partial share: their incoming/ directories should
367                # now be empty.
368                log.msg("disconnected", level=log.NOISY,
369                        facility="tahoe.test.test_system")
370                for i in range(self.numclients):
371                    incdir = os.path.join(self.getdir("client%d" % i),
372                                          "storage", "shares", "incoming")
373                    self.failIf(os.path.exists(incdir) and os.listdir(incdir))
374            d.addCallback(_disconnected)
375
376            d.addCallback(lambda res:
377                          log.msg("wait_for_helper", level=log.NOISY,
378                                  facility="tahoe.test.test_system"))
379            # then we need to wait for the extra node to reestablish its
380            # connection to the helper.
381            d.addCallback(lambda ign: self.poll(_has_helper))
382
383            d.addCallback(lambda res:
384                          log.msg("uploading again", level=log.NOISY,
385                                  facility="tahoe.test.test_system"))
386            d.addCallback(lambda res: self.extra_node.upload(u2))
387
388            def _uploaded(results):
389                cap = results.get_uri()
390                log.msg("Second upload complete", level=log.NOISY,
391                        facility="tahoe.test.test_system")
392
393                # this is really bytes received rather than sent, but it's
394                # convenient and basically measures the same thing
395                bytes_sent = results.get_ciphertext_fetched()
396                self.failUnless(isinstance(bytes_sent, int), bytes_sent)
397
398                # We currently don't support resumption of upload if the data is
399                # encrypted with a random key.  (Because that would require us
400                # to store the key locally and re-use it on the next upload of
401                # this file, which isn't a bad thing to do, but we currently
402                # don't do it.)
403                if convergence is not None:
404                    # Make sure we did not have to read the whole file the
405                    # second time around .
406                    self.failUnless(bytes_sent < len(DATA),
407                                    "resumption didn't save us any work:"
408                                    " read %r bytes out of %r total" %
409                                    (bytes_sent, len(DATA)))
410                else:
411                    # Make sure we did have to read the whole file the second
412                    # time around -- because the one that we partially uploaded
413                    # earlier was encrypted with a different random key.
414                    self.failIf(bytes_sent < len(DATA),
415                                "resumption saved us some work even though we were using random keys:"
416                                " read %r bytes out of %r total" %
417                                (bytes_sent, len(DATA)))
418                n = self.clients[1].create_node_from_uri(cap)
419                return download_to_data(n)
420            d.addCallback(_uploaded)
421
422            def _check(newdata):
423                self.failUnlessEqual(newdata, DATA)
424                # If using convergent encryption, then also check that the
425                # helper has removed the temp file from its directories.
426                if convergence is not None:
427                    basedir = os.path.join(self.getdir("client0"), "helper")
428                    files = os.listdir(os.path.join(basedir, "CHK_encoding"))
429                    self.failUnlessEqual(files, [])
430                    files = os.listdir(os.path.join(basedir, "CHK_incoming"))
431                    self.failUnlessEqual(files, [])
432            d.addCallback(_check)
433            return d
434        d.addCallback(_upload_resumable)
435
436        def _grab_stats(ignored):
437            stats = self.clients[0].stats_provider.get_stats()
438            s = stats["stats"]
439            self.failUnlessEqual(s["storage_server.accepting_immutable_shares"], 1)
440            c = stats["counters"]
441            self.failUnless("storage_server.allocate" in c)
442        d.addCallback(_grab_stats)
443
444        return d
445
446    def _find_all_shares(self, basedir):
447        shares = []
448        for (dirpath, dirnames, filenames) in os.walk(basedir):
449            if "storage" not in dirpath:
450                continue
451            if not filenames:
452                continue
453            pieces = dirpath.split(os.sep)
454            if (len(pieces) >= 5
455                and pieces[-4] == "storage"
456                and pieces[-3] == "shares"):
457                # we're sitting in .../storage/shares/$START/$SINDEX , and there
458                # are sharefiles here
459                assert pieces[-5].startswith("client")
460                client_num = int(pieces[-5][-1])
461                storage_index_s = pieces[-1]
462                storage_index = si_a2b(storage_index_s.encode("ascii"))
463                for sharename in filenames:
464                    shnum = int(sharename)
465                    filename = os.path.join(dirpath, sharename)
466                    data = (client_num, storage_index, filename, shnum)
467                    shares.append(data)
468        if not shares:
469            self.fail("unable to find any share files in %s" % basedir)
470        return shares
471
472    def _corrupt_mutable_share(self, filename, which):
473        msf = MutableShareFile(filename)
474        # Read more than share length:
475        datav = msf.readv([ (0, 10_000_000) ])
476        final_share = datav[0]
477        assert len(final_share) < 10_000_000 # ought to be truncated
478        pieces = mutable_layout.unpack_share(final_share)
479        (seqnum, root_hash, IV, k, N, segsize, datalen,
480         verification_key, signature, share_hash_chain, block_hash_tree,
481         share_data, enc_privkey) = pieces
482
483        if which == "seqnum":
484            seqnum = seqnum + 15
485        elif which == "R":
486            root_hash = self.flip_bit(root_hash)
487        elif which == "IV":
488            IV = self.flip_bit(IV)
489        elif which == "segsize":
490            segsize = segsize + 15
491        elif which == "pubkey":
492            verification_key = self.flip_bit(verification_key)
493        elif which == "signature":
494            signature = self.flip_bit(signature)
495        elif which == "share_hash_chain":
496            nodenum = list(share_hash_chain.keys())[0]
497            share_hash_chain[nodenum] = self.flip_bit(share_hash_chain[nodenum])
498        elif which == "block_hash_tree":
499            block_hash_tree[-1] = self.flip_bit(block_hash_tree[-1])
500        elif which == "share_data":
501            share_data = self.flip_bit(share_data)
502        elif which == "encprivkey":
503            enc_privkey = self.flip_bit(enc_privkey)
504
505        prefix = mutable_layout.pack_prefix(seqnum, root_hash, IV, k, N,
506                                            segsize, datalen)
507        final_share = mutable_layout.pack_share(prefix,
508                                                verification_key,
509                                                signature,
510                                                share_hash_chain,
511                                                block_hash_tree,
512                                                share_data,
513                                                enc_privkey)
514        msf.writev( [(0, final_share)], None)
515
516
517    def test_mutable_sdmf(self):
518        """SDMF mutables can be uploaded, downloaded, and many other things."""
519        return self._test_mutable(SDMF_VERSION)
520
521    def test_mutable_mdmf(self):
522        """MDMF mutables can be uploaded, downloaded, and many other things."""
523        return self._test_mutable(MDMF_VERSION)
524
525    def _test_mutable(self, mutable_version):
526        DATA = b"initial contents go here."  # 25 bytes % 3 != 0
527        DATA_uploadable = MutableData(DATA)
528        NEWDATA = b"new contents yay"
529        NEWDATA_uploadable = MutableData(NEWDATA)
530        NEWERDATA = b"this is getting old" * 1_000_000
531        NEWERDATA_uploadable = MutableData(NEWERDATA)
532
533        d = self.set_up_nodes()
534
535        def _create_mutable(res):
536            c = self.clients[0]
537            log.msg("starting create_mutable_file")
538            d1 = c.create_mutable_file(DATA_uploadable, mutable_version)
539            def _done(res):
540                log.msg("DONE: %s" % (res,))
541                self._mutable_node_1 = res
542            d1.addCallback(_done)
543            return d1
544        d.addCallback(_create_mutable)
545
546        @defer.inlineCallbacks
547        def _test_debug(res):
548            # find a share. It is important to run this while there is only
549            # one slot in the grid.
550            shares = self._find_all_shares(self.basedir)
551            (client_num, storage_index, filename, shnum) = shares[0]
552            log.msg("test_system.SystemTest.test_mutable._test_debug using %s"
553                    % filename)
554            log.msg(" for clients[%d]" % client_num)
555
556            rc,output,err = yield run_cli("debug", "dump-share", "--offsets",
557                                          filename)
558            self.failUnlessEqual(rc, 0)
559            try:
560                share_type = 'SDMF' if mutable_version == SDMF_VERSION else 'MDMF'
561                self.failUnless("Mutable slot found:\n" in output)
562                self.assertIn(f"share_type: {share_type}\n", output)
563                peerid = idlib.nodeid_b2a(self.clients[client_num].nodeid)
564                self.failUnless(" WE for nodeid: %s\n" % peerid in output)
565                self.failUnless(" num_extra_leases: 0\n" in output)
566                self.failUnless("  secrets are for nodeid: %s\n" % peerid
567                                in output)
568                self.failUnless(f" {share_type} contents:\n" in output)
569                self.failUnless("  seqnum: 1\n" in output)
570                self.failUnless("  required_shares: 3\n" in output)
571                self.failUnless("  total_shares: 10\n" in output)
572                if mutable_version == SDMF_VERSION:
573                    self.failUnless("  segsize: 27\n" in output, (output, filename))
574                self.failUnless("  datalen: 25\n" in output)
575                # the exact share_hash_chain nodes depends upon the sharenum,
576                # and is more of a hassle to compute than I want to deal with
577                # now
578                self.failUnless("  share_hash_chain: " in output)
579                self.failUnless("  block_hash_tree: 1 nodes\n" in output)
580                if mutable_version == SDMF_VERSION:
581                    expected = ("  verify-cap: URI:SSK-Verifier:%s:" %
582                                str(base32.b2a(storage_index), "ascii"))
583                else:
584                    expected = ("  verify-cap: URI:MDMF-Verifier:%s" %
585                                str(base32.b2a(storage_index), "ascii"))
586                self.assertIn(expected, output)
587            except unittest.FailTest:
588                print()
589                print("dump-share output was:")
590                print(output)
591                raise
592        d.addCallback(_test_debug)
593
594        # test retrieval
595
596        # first, let's see if we can use the existing node to retrieve the
597        # contents. This allows it to use the cached pubkey and maybe the
598        # latest-known sharemap.
599
600        d.addCallback(lambda res: self._mutable_node_1.download_best_version())
601        def _check_download_1(res):
602            self.failUnlessEqual(res, DATA)
603            # now we see if we can retrieve the data from a new node,
604            # constructed using the URI of the original one. We do this test
605            # on the same client that uploaded the data.
606            uri = self._mutable_node_1.get_uri()
607            log.msg("starting retrieve1")
608            newnode = self.clients[0].create_node_from_uri(uri)
609            newnode_2 = self.clients[0].create_node_from_uri(uri)
610            self.failUnlessIdentical(newnode, newnode_2)
611            return newnode.download_best_version()
612        d.addCallback(_check_download_1)
613
614        def _check_download_2(res):
615            self.failUnlessEqual(res, DATA)
616            # same thing, but with a different client
617            uri = self._mutable_node_1.get_uri()
618            newnode = self.clients[1].create_node_from_uri(uri)
619            log.msg("starting retrieve2")
620            d1 = newnode.download_best_version()
621            d1.addCallback(lambda res: (res, newnode))
622            return d1
623        d.addCallback(_check_download_2)
624
625        def _check_download_3(res_and_newnode):
626            (res, newnode) = res_and_newnode
627            self.failUnlessEqual(res, DATA)
628            # replace the data
629            log.msg("starting replace1")
630            d1 = newnode.overwrite(NEWDATA_uploadable)
631            d1.addCallback(lambda res: newnode.download_best_version())
632            return d1
633        d.addCallback(_check_download_3)
634
635        def _check_download_4(res):
636            self.failUnlessEqual(res, NEWDATA)
637            # now create an even newer node and replace the data on it. This
638            # new node has never been used for download before.
639            uri = self._mutable_node_1.get_uri()
640            newnode1 = self.clients[2].create_node_from_uri(uri)
641            newnode2 = self.clients[3].create_node_from_uri(uri)
642            self._newnode3 = self.clients[3].create_node_from_uri(uri)
643            log.msg("starting replace2")
644            d1 = newnode1.overwrite(NEWERDATA_uploadable)
645            d1.addCallback(lambda res: newnode2.download_best_version())
646            return d1
647        d.addCallback(_check_download_4)
648
649        def _check_download_5(res):
650            log.msg("finished replace2")
651            self.failUnlessEqual(res, NEWERDATA)
652        d.addCallback(_check_download_5)
653
654        # The previous checks upload a complete replacement. This uses a
655        # different API that is supposed to do a partial write at an offset.
656        @async_to_deferred
657        async def _check_write_at_offset(newnode):
658            log.msg("writing at offset")
659            start = b"abcdef"
660            expected = b"abXYef"
661            uri = self._mutable_node_1.get_uri()
662            newnode = self.clients[0].create_node_from_uri(uri)
663            await newnode.overwrite(MutableData(start))
664            version = await newnode.get_mutable_version()
665            await version.update(MutableData(b"XY"), 2)
666            result = await newnode.download_best_version()
667            self.assertEqual(result, expected)
668            # Revert to previous version
669            await newnode.overwrite(MutableData(NEWERDATA))
670        d.addCallback(_check_write_at_offset)
671
672        def _corrupt_shares(_res):
673            # run around and flip bits in all but k of the shares, to test
674            # the hash checks
675            shares = self._find_all_shares(self.basedir)
676            ## sort by share number
677            #shares.sort( lambda a,b: cmp(a[3], b[3]) )
678            where = dict([ (shnum, filename)
679                           for (client_num, storage_index, filename, shnum)
680                           in shares ])
681            assert len(where) == 10 # this test is designed for 3-of-10
682            for shnum, filename in list(where.items()):
683                # shares 7,8,9 are left alone. read will check
684                # (share_hash_chain, block_hash_tree, share_data). New
685                # seqnum+R pairs will trigger a check of (seqnum, R, IV,
686                # segsize, signature).
687                if shnum == 0:
688                    # read: this will trigger "pubkey doesn't match
689                    # fingerprint".
690                    self._corrupt_mutable_share(filename, "pubkey")
691                    self._corrupt_mutable_share(filename, "encprivkey")
692                elif shnum == 1:
693                    # triggers "signature is invalid"
694                    self._corrupt_mutable_share(filename, "seqnum")
695                elif shnum == 2:
696                    # triggers "signature is invalid"
697                    self._corrupt_mutable_share(filename, "R")
698                elif shnum == 3:
699                    # triggers "signature is invalid"
700                    self._corrupt_mutable_share(filename, "segsize")
701                elif shnum == 4:
702                    self._corrupt_mutable_share(filename, "share_hash_chain")
703                elif shnum == 5:
704                    self._corrupt_mutable_share(filename, "block_hash_tree")
705                elif shnum == 6:
706                    self._corrupt_mutable_share(filename, "share_data")
707                # other things to correct: IV, signature
708                # 7,8,9 are left alone
709
710                # note that initial_query_count=5 means that we'll hit the
711                # first 5 servers in effectively random order (based upon
712                # response time), so we won't necessarily ever get a "pubkey
713                # doesn't match fingerprint" error (if we hit shnum>=1 before
714                # shnum=0, we pull the pubkey from there). To get repeatable
715                # specific failures, we need to set initial_query_count=1,
716                # but of course that will change the sequencing behavior of
717                # the retrieval process. TODO: find a reasonable way to make
718                # this a parameter, probably when we expand this test to test
719                # for one failure mode at a time.
720
721                # when we retrieve this, we should get three signature
722                # failures (where we've mangled seqnum, R, and segsize). The
723                # pubkey mangling
724
725        if mutable_version == SDMF_VERSION:
726            # TODO Corrupting shares in test_systm doesn't work for MDMF right now
727            d.addCallback(_corrupt_shares)
728
729        d.addCallback(lambda res: self._newnode3.download_best_version())
730        d.addCallback(_check_download_5)
731
732        def _check_empty_file(res):
733            # make sure we can create empty files, this usually screws up the
734            # segsize math
735            d1 = self.clients[2].create_mutable_file(MutableData(b""), mutable_version)
736            d1.addCallback(lambda newnode: newnode.download_best_version())
737            d1.addCallback(lambda res: self.failUnlessEqual(b"", res))
738            return d1
739        d.addCallback(_check_empty_file)
740
741        d.addCallback(lambda res: self.clients[0].create_dirnode())
742        def _created_dirnode(dnode):
743            log.msg("_created_dirnode(%s)" % (dnode,))
744            d1 = dnode.list()
745            d1.addCallback(lambda children: self.failUnlessEqual(children, {}))
746            d1.addCallback(lambda res: dnode.has_child(u"edgar"))
747            d1.addCallback(lambda answer: self.failUnlessEqual(answer, False))
748            d1.addCallback(lambda res: dnode.set_node(u"see recursive", dnode))
749            d1.addCallback(lambda res: dnode.has_child(u"see recursive"))
750            d1.addCallback(lambda answer: self.failUnlessEqual(answer, True))
751            d1.addCallback(lambda res: dnode.build_manifest().when_done())
752            d1.addCallback(lambda res:
753                           self.failUnlessEqual(len(res["manifest"]), 1))
754            return d1
755        d.addCallback(_created_dirnode)
756
757        return d
758
759    def flip_bit(self, good):
760        return good[:-1] + byteschr(ord(good[-1:]) ^ 0x01)
761
762    def mangle_uri(self, gooduri):
763        # change the key, which changes the storage index, which means we'll
764        # be asking about the wrong file, so nobody will have any shares
765        u = uri.from_string(gooduri)
766        u2 = uri.CHKFileURI(key=self.flip_bit(u.key),
767                            uri_extension_hash=u.uri_extension_hash,
768                            needed_shares=u.needed_shares,
769                            total_shares=u.total_shares,
770                            size=u.size)
771        return u2.to_string()
772
773    # TODO: add a test which mangles the uri_extension_hash instead, and
774    # should fail due to not being able to get a valid uri_extension block.
775    # Also a test which sneakily mangles the uri_extension block to change
776    # some of the validation data, so it will fail in the post-download phase
777    # when the file's crypttext integrity check fails. Do the same thing for
778    # the key, which should cause the download to fail the post-download
779    # plaintext_hash check.
780
781    def test_filesystem(self):
782        self.data = LARGE_DATA
783        d = self.set_up_nodes(2)
784        def _new_happy_semantics(ign):
785            for c in self.clients:
786                c.encoding_params['happy'] = 1
787        d.addCallback(_new_happy_semantics)
788        d.addCallback(self.log, "starting publish")
789        d.addCallback(self._do_publish1)
790        d.addCallback(self._test_runner)
791        d.addCallback(self._do_publish2)
792        # at this point, we have the following filesystem (where "R" denotes
793        # self._root_directory_uri):
794        # R
795        # R/subdir1
796        # R/subdir1/mydata567
797        # R/subdir1/subdir2/
798        # R/subdir1/subdir2/mydata992
799
800        d.addCallback(lambda res: self.bounce_client(0))
801        d.addCallback(self.log, "bounced client0")
802
803        d.addCallback(self._check_publish1)
804        d.addCallback(self.log, "did _check_publish1")
805        d.addCallback(self._check_publish2)
806        d.addCallback(self.log, "did _check_publish2")
807        d.addCallback(self._do_publish_private)
808        d.addCallback(self.log, "did _do_publish_private")
809        # now we also have (where "P" denotes a new dir):
810        #  P/personal/sekrit data
811        #  P/s2-rw -> /subdir1/subdir2/
812        #  P/s2-ro -> /subdir1/subdir2/ (read-only)
813        d.addCallback(self._check_publish_private)
814        d.addCallback(self.log, "did _check_publish_private")
815        d.addCallback(self._test_web)
816        d.addCallback(self._test_cli)
817        # P now has four top-level children:
818        # P/personal/sekrit data
819        # P/s2-ro/
820        # P/s2-rw/
821        # P/test_put/  (empty)
822        d.addCallback(self._test_checker)
823        return d
824
825    def _do_publish1(self, res):
826        ut = upload.Data(self.data, convergence=None)
827        c0 = self.clients[0]
828        d = c0.create_dirnode()
829        def _made_root(new_dirnode):
830            self._root_directory_uri = new_dirnode.get_uri()
831            return c0.create_node_from_uri(self._root_directory_uri)
832        d.addCallback(_made_root)
833        d.addCallback(lambda root: root.create_subdirectory(u"subdir1"))
834        def _made_subdir1(subdir1_node):
835            self._subdir1_node = subdir1_node
836            d1 = subdir1_node.add_file(u"mydata567", ut)
837            d1.addCallback(self.log, "publish finished")
838            def _stash_uri(filenode):
839                self.uri = filenode.get_uri()
840                assert isinstance(self.uri, bytes), (self.uri, filenode)
841            d1.addCallback(_stash_uri)
842            return d1
843        d.addCallback(_made_subdir1)
844        return d
845
846    def _do_publish2(self, res):
847        ut = upload.Data(self.data, convergence=None)
848        d = self._subdir1_node.create_subdirectory(u"subdir2")
849        d.addCallback(lambda subdir2: subdir2.add_file(u"mydata992", ut))
850        return d
851
852    def log(self, res, *args, **kwargs):
853        # print("MSG: %s  RES: %s" % (msg, args))
854        log.msg(*args, **kwargs)
855        return res
856
857    def _do_publish_private(self, res):
858        self.smalldata = b"sssh, very secret stuff"
859        ut = upload.Data(self.smalldata, convergence=None)
860        d = self.clients[0].create_dirnode()
861        d.addCallback(self.log, "GOT private directory")
862        def _got_new_dir(privnode):
863            rootnode = self.clients[0].create_node_from_uri(self._root_directory_uri)
864            d1 = privnode.create_subdirectory(u"personal")
865            d1.addCallback(self.log, "made P/personal")
866            d1.addCallback(lambda node: node.add_file(u"sekrit data", ut))
867            d1.addCallback(self.log, "made P/personal/sekrit data")
868            d1.addCallback(lambda res: rootnode.get_child_at_path([u"subdir1", u"subdir2"]))
869            def _got_s2(s2node):
870                d2 = privnode.set_uri(u"s2-rw", s2node.get_uri(),
871                                      s2node.get_readonly_uri())
872                d2.addCallback(lambda node:
873                               privnode.set_uri(u"s2-ro",
874                                                s2node.get_readonly_uri(),
875                                                s2node.get_readonly_uri()))
876                return d2
877            d1.addCallback(_got_s2)
878            d1.addCallback(lambda res: privnode)
879            return d1
880        d.addCallback(_got_new_dir)
881        return d
882
883    def _check_publish1(self, res):
884        # this one uses the iterative API
885        c1 = self.clients[1]
886        d = defer.succeed(c1.create_node_from_uri(self._root_directory_uri))
887        d.addCallback(self.log, "check_publish1 got /")
888        d.addCallback(lambda root: root.get(u"subdir1"))
889        d.addCallback(lambda subdir1: subdir1.get(u"mydata567"))
890        d.addCallback(lambda filenode: download_to_data(filenode))
891        d.addCallback(self.log, "get finished")
892        def _get_done(data):
893            self.failUnlessEqual(data, self.data)
894        d.addCallback(_get_done)
895        return d
896
897    def _check_publish2(self, res):
898        # this one uses the path-based API
899        rootnode = self.clients[1].create_node_from_uri(self._root_directory_uri)
900        d = rootnode.get_child_at_path(u"subdir1")
901        d.addCallback(lambda dirnode:
902                      self.failUnless(IDirectoryNode.providedBy(dirnode)))
903        d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
904        d.addCallback(lambda filenode: download_to_data(filenode))
905        d.addCallback(lambda data: self.failUnlessEqual(data, self.data))
906
907        d.addCallback(lambda res: rootnode.get_child_at_path(u"subdir1/mydata567"))
908        def _got_filenode(filenode):
909            fnode = self.clients[1].create_node_from_uri(filenode.get_uri())
910            assert fnode == filenode
911        d.addCallback(_got_filenode)
912        return d
913
914    def _check_publish_private(self, resnode):
915        # this one uses the path-based API
916        self._private_node = resnode
917
918        d = self._private_node.get_child_at_path(u"personal")
919        def _got_personal(personal):
920            self._personal_node = personal
921            return personal
922        d.addCallback(_got_personal)
923
924        d.addCallback(lambda dirnode:
925                      self.failUnless(IDirectoryNode.providedBy(dirnode), dirnode))
926        def get_path(path):
927            return self._private_node.get_child_at_path(path)
928
929        d.addCallback(lambda res: get_path(u"personal/sekrit data"))
930        d.addCallback(lambda filenode: download_to_data(filenode))
931        d.addCallback(lambda data: self.failUnlessEqual(data, self.smalldata))
932        d.addCallback(lambda res: get_path(u"s2-rw"))
933        d.addCallback(lambda dirnode: self.failUnless(dirnode.is_mutable()))
934        d.addCallback(lambda res: get_path(u"s2-ro"))
935        def _got_s2ro(dirnode):
936            self.failUnless(dirnode.is_mutable(), dirnode)
937            self.failUnless(dirnode.is_readonly(), dirnode)
938            d1 = defer.succeed(None)
939            d1.addCallback(lambda res: dirnode.list())
940            d1.addCallback(self.log, "dirnode.list")
941
942            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mkdir(nope)", None, dirnode.create_subdirectory, u"nope"))
943
944            d1.addCallback(self.log, "doing add_file(ro)")
945            ut = upload.Data(b"I will disappear, unrecorded and unobserved. The tragedy of my demise is made more poignant by its silence, but this beauty is not for you to ever know.", convergence=b"99i-p1x4-xd4-18yc-ywt-87uu-msu-zo -- completely and totally unguessable string (unless you read this)")
946            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "add_file(nope)", None, dirnode.add_file, u"hope", ut))
947
948            d1.addCallback(self.log, "doing get(ro)")
949            d1.addCallback(lambda res: dirnode.get(u"mydata992"))
950            d1.addCallback(lambda filenode:
951                           self.failUnless(IFileNode.providedBy(filenode)))
952
953            d1.addCallback(self.log, "doing delete(ro)")
954            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "delete(nope)", None, dirnode.delete, u"mydata992"))
955
956            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "set_uri(nope)", None, dirnode.set_uri, u"hopeless", self.uri, self.uri))
957
958            d1.addCallback(lambda res: self.shouldFail2(NoSuchChildError, "get(missing)", "missing", dirnode.get, u"missing"))
959
960            personal = self._personal_node
961            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv from readonly", None, dirnode.move_child_to, u"mydata992", personal, u"nope"))
962
963            d1.addCallback(self.log, "doing move_child_to(ro)2")
964            d1.addCallback(lambda res: self.shouldFail2(NotWriteableError, "mv to readonly", None, personal.move_child_to, u"sekrit data", dirnode, u"nope"))
965
966            d1.addCallback(self.log, "finished with _got_s2ro")
967            return d1
968        d.addCallback(_got_s2ro)
969        def _got_home(dummy):
970            home = self._private_node
971            personal = self._personal_node
972            d1 = defer.succeed(None)
973            d1.addCallback(self.log, "mv 'P/personal/sekrit data' to P/sekrit")
974            d1.addCallback(lambda res:
975                           personal.move_child_to(u"sekrit data",home,u"sekrit"))
976
977            d1.addCallback(self.log, "mv P/sekrit 'P/sekrit data'")
978            d1.addCallback(lambda res:
979                           home.move_child_to(u"sekrit", home, u"sekrit data"))
980
981            d1.addCallback(self.log, "mv 'P/sekret data' P/personal/")
982            d1.addCallback(lambda res:
983                           home.move_child_to(u"sekrit data", personal))
984
985            d1.addCallback(lambda res: home.build_manifest().when_done())
986            d1.addCallback(self.log, "manifest")
987            #  five items:
988            # P/
989            # P/personal/
990            # P/personal/sekrit data
991            # P/s2-rw  (same as P/s2-ro)
992            # P/s2-rw/mydata992 (same as P/s2-rw/mydata992)
993            d1.addCallback(lambda res:
994                           self.failUnlessEqual(len(res["manifest"]), 5))
995            d1.addCallback(lambda res: home.start_deep_stats().when_done())
996            def _check_stats(stats):
997                expected = {"count-immutable-files": 1,
998                            "count-mutable-files": 0,
999                            "count-literal-files": 1,
1000                            "count-files": 2,
1001                            "count-directories": 3,
1002                            "size-immutable-files": 112,
1003                            "size-literal-files": 23,
1004                            #"size-directories": 616, # varies
1005                            #"largest-directory": 616,
1006                            "largest-directory-children": 3,
1007                            "largest-immutable-file": 112,
1008                            }
1009                for k,v in list(expected.items()):
1010                    self.failUnlessEqual(stats[k], v,
1011                                         "stats[%s] was %s, not %s" %
1012                                         (k, stats[k], v))
1013                self.failUnless(stats["size-directories"] > 1300,
1014                                stats["size-directories"])
1015                self.failUnless(stats["largest-directory"] > 800,
1016                                stats["largest-directory"])
1017                self.failUnlessEqual(stats["size-files-histogram"],
1018                                     [ (11, 31, 1), (101, 316, 1) ])
1019            d1.addCallback(_check_stats)
1020            return d1
1021        d.addCallback(_got_home)
1022        return d
1023
1024    def shouldFail(self, res, expected_failure, which, substring=None):
1025        if isinstance(res, Failure):
1026            res.trap(expected_failure)
1027            if substring:
1028                self.failUnless(substring in str(res),
1029                                "substring '%s' not in '%s'"
1030                                % (substring, str(res)))
1031        else:
1032            self.fail("%s was supposed to raise %s, not get '%s'" %
1033                      (which, expected_failure, res))
1034
1035    def shouldFail2(self, expected_failure, which, substring, callable, *args, **kwargs):
1036        assert substring is None or isinstance(substring, str)
1037        d = defer.maybeDeferred(callable, *args, **kwargs)
1038        def done(res):
1039            if isinstance(res, Failure):
1040                res.trap(expected_failure)
1041                if substring:
1042                    self.failUnless(substring in str(res),
1043                                    "substring '%s' not in '%s'"
1044                                    % (substring, str(res)))
1045            else:
1046                self.fail("%s was supposed to raise %s, not get '%s'" %
1047                          (which, expected_failure, res))
1048        d.addBoth(done)
1049        return d
1050
1051    def PUT(self, urlpath, data):
1052        return do_http("put", self.webish_url + urlpath, data=data)
1053
1054    def GET(self, urlpath):
1055        return do_http("get", self.webish_url + urlpath)
1056
1057    def POST(self, urlpath, use_helper=False, **fields):
1058        sepbase = b"boogabooga"
1059        sep = b"--" + sepbase
1060        form = []
1061        form.append(sep)
1062        form.append(b'Content-Disposition: form-data; name="_charset"')
1063        form.append(b'')
1064        form.append(b'UTF-8')
1065        form.append(sep)
1066        for name, value in fields.items():
1067            if isinstance(value, tuple):
1068                filename, value = value
1069                form.append(b'Content-Disposition: form-data; name="%s"; '
1070                            b'filename="%s"' % (name.encode("utf-8"), filename.encode("utf-8")))
1071            else:
1072                form.append(b'Content-Disposition: form-data; name="%s"' % name.encode("utf-8"))
1073            form.append(b'')
1074            form.append(b"%s" % (value,))
1075            form.append(sep)
1076        form[-1] += b"--"
1077        body = b""
1078        headers = {}
1079        if fields:
1080            body = b"\r\n".join(form) + b"\r\n"
1081            headers["content-type"] = "multipart/form-data; boundary=%s" % str(sepbase, "ascii")
1082        return self.POST2(urlpath, body, headers, use_helper)
1083
1084    def POST2(self, urlpath, body=b"", headers=None, use_helper=False):
1085        if headers is None:
1086            headers = {}
1087        if use_helper:
1088            url = self.helper_webish_url + urlpath
1089        else:
1090            url = self.webish_url + urlpath
1091        return do_http("post", url, data=body, headers=headers)
1092
1093    def _test_web(self, res):
1094        public = "uri/" + str(self._root_directory_uri, "ascii")
1095        d = self.GET("")
1096        def _got_welcome(page):
1097            html = page.replace('\n', ' ')
1098            connected_re = r'Connected to <span>%d</span>\s*of <span>%d</span> known storage servers' % (self.numclients, self.numclients)
1099            self.failUnless(re.search(connected_re, html),
1100                            "I didn't see the right '%s' message in:\n%s" % (connected_re, page))
1101            # nodeids/tubids don't have any regexp-special characters
1102            nodeid_re = r'<th>Node ID:</th>\s*<td title="TubID: %s">%s</td>' % (
1103                self.clients[0].get_long_tubid(), str(self.clients[0].get_long_nodeid(), "ascii"))
1104            self.failUnless(re.search(nodeid_re, html),
1105                            "I didn't see the right '%s' message in:\n%s" % (nodeid_re, page))
1106            self.failUnless("Helper: 0 active uploads" in page)
1107        d.addCallback(_got_welcome)
1108        d.addCallback(self.log, "done with _got_welcome")
1109
1110        # get the welcome page from the node that uses the helper too
1111        d.addCallback(lambda res: do_http("get", self.helper_webish_url))
1112        def _got_welcome_helper(page):
1113            soup = BeautifulSoup(page, 'html5lib')
1114            assert_soup_has_tag_with_attributes(
1115                self, soup, u"img",
1116                { u"alt": u"Connected", u"src": u"img/connected-yes.png" }
1117            )
1118            self.failUnlessIn("Not running helper", page)
1119        d.addCallback(_got_welcome_helper)
1120
1121        d.addCallback(lambda res: self.GET(public))
1122        d.addCallback(lambda res: self.GET(public + "/subdir1"))
1123        def _got_subdir1(page):
1124            # there ought to be an href for our file
1125            self.failUnlessIn('<td align="right">%d</td>' % len(self.data), page)
1126            self.failUnless(">mydata567</a>" in page)
1127        d.addCallback(_got_subdir1)
1128        d.addCallback(self.log, "done with _got_subdir1")
1129        d.addCallback(lambda res: self.GET(public + "/subdir1/mydata567"))
1130        def _got_data(page):
1131            self.failUnlessEqual(page.encode("utf-8"), self.data)
1132        d.addCallback(_got_data)
1133
1134        # download from a URI embedded in a URL
1135        d.addCallback(self.log, "_get_from_uri")
1136        def _get_from_uri(res):
1137            return self.GET("uri/%s?filename=%s" % (str(self.uri, "utf-8"), "mydata567"))
1138        d.addCallback(_get_from_uri)
1139        def _got_from_uri(page):
1140            self.failUnlessEqual(page.encode("utf-8"), self.data)
1141        d.addCallback(_got_from_uri)
1142
1143        # download from a URI embedded in a URL, second form
1144        d.addCallback(self.log, "_get_from_uri2")
1145        def _get_from_uri2(res):
1146            return self.GET("uri?uri=%s" % (str(self.uri, "utf-8"),))
1147        d.addCallback(_get_from_uri2)
1148        d.addCallback(_got_from_uri)
1149
1150        # download from a bogus URI, make sure we get a reasonable error
1151        d.addCallback(self.log, "_get_from_bogus_uri", level=log.UNUSUAL)
1152        @defer.inlineCallbacks
1153        def _get_from_bogus_uri(res):
1154            d1 = self.GET("uri/%s?filename=%s"
1155                          % (str(self.mangle_uri(self.uri), "utf-8"), "mydata567"))
1156            e = yield self.assertFailure(d1, Error)
1157            self.assertEquals(e.status, b"410")
1158        d.addCallback(_get_from_bogus_uri)
1159        d.addCallback(self.log, "_got_from_bogus_uri", level=log.UNUSUAL)
1160
1161        # upload a file with PUT
1162        d.addCallback(self.log, "about to try PUT")
1163        d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1164                                           b"new.txt contents"))
1165        d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1166        d.addCallback(self.failUnlessEqual, "new.txt contents")
1167        # and again with something large enough to use multiple segments,
1168        # and hopefully trigger pauseProducing too
1169        def _new_happy_semantics(ign):
1170            for c in self.clients:
1171                # these get reset somewhere? Whatever.
1172                c.encoding_params['happy'] = 1
1173        d.addCallback(_new_happy_semantics)
1174        d.addCallback(lambda res: self.PUT(public + "/subdir3/big.txt",
1175                                           b"big" * 500000)) # 1.5MB
1176        d.addCallback(lambda res: self.GET(public + "/subdir3/big.txt"))
1177        d.addCallback(lambda res: self.failUnlessEqual(len(res), 1500000))
1178
1179        # can we replace files in place?
1180        d.addCallback(lambda res: self.PUT(public + "/subdir3/new.txt",
1181                                           b"NEWER contents"))
1182        d.addCallback(lambda res: self.GET(public + "/subdir3/new.txt"))
1183        d.addCallback(self.failUnlessEqual, "NEWER contents")
1184
1185        # test unlinked POST
1186        d.addCallback(lambda res: self.POST("uri", t=b"upload",
1187                                            file=("new.txt", b"data" * 10000)))
1188        # and again using the helper, which exercises different upload-status
1189        # display code
1190        d.addCallback(lambda res: self.POST("uri", use_helper=True, t=b"upload",
1191                                            file=("foo.txt", b"data2" * 10000)))
1192
1193        # check that the status page exists
1194        d.addCallback(lambda res: self.GET("status"))
1195        def _got_status(res):
1196            # find an interesting upload and download to look at. LIT files
1197            # are not interesting.
1198            h = self.clients[0].get_history()
1199            for ds in h.list_all_download_statuses():
1200                if ds.get_size() > 200:
1201                    self._down_status = ds.get_counter()
1202            for us in h.list_all_upload_statuses():
1203                if us.get_size() > 200:
1204                    self._up_status = us.get_counter()
1205            rs = list(h.list_all_retrieve_statuses())[0]
1206            self._retrieve_status = rs.get_counter()
1207            ps = list(h.list_all_publish_statuses())[0]
1208            self._publish_status = ps.get_counter()
1209            us = list(h.list_all_mapupdate_statuses())[0]
1210            self._update_status = us.get_counter()
1211
1212            # and that there are some upload- and download- status pages
1213            return self.GET("status/up-%d" % self._up_status)
1214        d.addCallback(_got_status)
1215        def _got_up(res):
1216            return self.GET("status/down-%d" % self._down_status)
1217        d.addCallback(_got_up)
1218        def _got_down(res):
1219            return self.GET("status/mapupdate-%d" % self._update_status)
1220        d.addCallback(_got_down)
1221        def _got_update(res):
1222            return self.GET("status/publish-%d" % self._publish_status)
1223        d.addCallback(_got_update)
1224        def _got_publish(res):
1225            self.failUnlessIn("Publish Results", res)
1226            return self.GET("status/retrieve-%d" % self._retrieve_status)
1227        d.addCallback(_got_publish)
1228        def _got_retrieve(res):
1229            self.failUnlessIn("Retrieve Results", res)
1230        d.addCallback(_got_retrieve)
1231
1232        # check that the helper status page exists
1233        d.addCallback(lambda res: self.GET("helper_status"))
1234        def _got_helper_status(res):
1235            self.failUnless("Bytes Fetched:" in res)
1236            # touch a couple of files in the helper's working directory to
1237            # exercise more code paths
1238            workdir = os.path.join(self.getdir("client0"), "helper")
1239            incfile = os.path.join(workdir, "CHK_incoming", "spurious")
1240            f = open(incfile, "wb")
1241            f.write(b"small file")
1242            f.close()
1243            then = time.time() - 86400*3
1244            now = time.time()
1245            os.utime(incfile, (now, then))
1246            encfile = os.path.join(workdir, "CHK_encoding", "spurious")
1247            f = open(encfile, "wb")
1248            f.write(b"less small file")
1249            f.close()
1250            os.utime(encfile, (now, then))
1251        d.addCallback(_got_helper_status)
1252        # and that the json form exists
1253        d.addCallback(lambda res: self.GET("helper_status?t=json"))
1254        def _got_helper_status_json(res):
1255            data = json.loads(res)
1256            self.failUnlessEqual(data["chk_upload_helper.upload_need_upload"],
1257                                 1)
1258            self.failUnlessEqual(data["chk_upload_helper.incoming_count"], 1)
1259            self.failUnlessEqual(data["chk_upload_helper.incoming_size"], 10)
1260            self.failUnlessEqual(data["chk_upload_helper.incoming_size_old"],
1261                                 10)
1262            self.failUnlessEqual(data["chk_upload_helper.encoding_count"], 1)
1263            self.failUnlessEqual(data["chk_upload_helper.encoding_size"], 15)
1264            self.failUnlessEqual(data["chk_upload_helper.encoding_size_old"],
1265                                 15)
1266        d.addCallback(_got_helper_status_json)
1267
1268        # and check that client[3] (which uses a helper but does not run one
1269        # itself) doesn't explode when you ask for its status
1270        d.addCallback(lambda res: do_http("get",
1271                                          self.helper_webish_url + "status/"))
1272        def _got_non_helper_status(res):
1273            self.failUnlessIn("Recent and Active Operations", res)
1274        d.addCallback(_got_non_helper_status)
1275
1276        # or for helper status with t=json
1277        d.addCallback(lambda res:
1278                      do_http("get",
1279                              self.helper_webish_url + "helper_status?t=json"))
1280        def _got_non_helper_status_json(res):
1281            data = json.loads(res)
1282            self.failUnlessEqual(data, {})
1283        d.addCallback(_got_non_helper_status_json)
1284
1285        # see if the statistics page exists
1286        d.addCallback(lambda res: self.GET("statistics"))
1287        def _got_stats(res):
1288            self.failUnlessIn("Operational Statistics", res)
1289            self.failUnlessIn('  "downloader.files_downloaded": 5,', res)
1290        d.addCallback(_got_stats)
1291        d.addCallback(lambda res: self.GET("statistics?t=json"))
1292        def _got_stats_json(res):
1293            data = json.loads(res)
1294            self.failUnlessEqual(data["counters"]["uploader.files_uploaded"], 5)
1295            self.failUnlessEqual(data["stats"]["chk_upload_helper.upload_need_upload"], 1)
1296        d.addCallback(_got_stats_json)
1297
1298        # TODO: mangle the second segment of a file, to test errors that
1299        # occur after we've already sent some good data, which uses a
1300        # different error path.
1301
1302        # TODO: download a URI with a form
1303        # TODO: create a directory by using a form
1304        # TODO: upload by using a form on the directory page
1305        #    url = base + "somedir/subdir1/freeform_post!!upload"
1306        # TODO: delete a file by using a button on the directory page
1307
1308        return d
1309
1310    @defer.inlineCallbacks
1311    def _test_runner(self, res):
1312        # exercise some of the diagnostic tools in runner.py
1313
1314        # find a share
1315        for (dirpath, dirnames, filenames) in os.walk(self.basedir):
1316            if "storage" not in dirpath:
1317                continue
1318            if not filenames:
1319                continue
1320            pieces = dirpath.split(os.sep)
1321            if (len(pieces) >= 4
1322                and pieces[-4] == "storage"
1323                and pieces[-3] == "shares"):
1324                # we're sitting in .../storage/shares/$START/$SINDEX , and there
1325                # are sharefiles here
1326                filename = os.path.join(dirpath, filenames[0])
1327                # peek at the magic to see if it is a chk share
1328                with open(filename, "rb") as f:
1329                    if ShareFile.is_valid_header(f.read(32)):
1330                        break
1331        else:
1332            self.fail("unable to find any uri_extension files in %r"
1333                      % self.basedir)
1334        log.msg("test_system.SystemTest._test_runner using %r" % filename)
1335
1336        rc,output,err = yield run_cli("debug", "dump-share", "--offsets",
1337                                      unicode_to_argv(filename))
1338        self.failUnlessEqual(rc, 0)
1339
1340        # we only upload a single file, so we can assert some things about
1341        # its size and shares.
1342        self.failUnlessIn("share filename: %s" % quote_output(abspath_expanduser_unicode(filename)), output)
1343        self.failUnlessIn("size: %d\n" % len(self.data), output)
1344        self.failUnlessIn("num_segments: 1\n", output)
1345        # segment_size is always a multiple of needed_shares
1346        self.failUnlessIn("segment_size: %d\n" % mathutil.next_multiple(len(self.data), 3), output)
1347        self.failUnlessIn("total_shares: 10\n", output)
1348        # keys which are supposed to be present
1349        for key in ("size", "num_segments", "segment_size",
1350                    "needed_shares", "total_shares",
1351                    "codec_name", "codec_params", "tail_codec_params",
1352                    #"plaintext_hash", "plaintext_root_hash",
1353                    "crypttext_hash", "crypttext_root_hash",
1354                    "share_root_hash", "UEB_hash"):
1355            self.failUnlessIn("%s: " % key, output)
1356        self.failUnlessIn("  verify-cap: URI:CHK-Verifier:", output)
1357
1358        # now use its storage index to find the other shares using the
1359        # 'find-shares' tool
1360        sharedir, shnum = os.path.split(filename)
1361        storagedir, storage_index_s = os.path.split(sharedir)
1362        nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1363        rc,out,err = yield run_cli("debug", "find-shares", storage_index_s,
1364                                   *nodedirs)
1365        self.failUnlessEqual(rc, 0)
1366        sharefiles = [sfn.strip() for sfn in out.splitlines()]
1367        self.failUnlessEqual(len(sharefiles), 10)
1368
1369        # also exercise the 'catalog-shares' tool
1370        nodedirs = [self.getdir("client%d" % i) for i in range(self.numclients)]
1371        rc,out,err = yield run_cli("debug", "catalog-shares", *nodedirs)
1372        self.failUnlessEqual(rc, 0)
1373        descriptions = [sfn.strip() for sfn in out.splitlines()]
1374        self.failUnlessEqual(len(descriptions), 30)
1375        matching = [line
1376                    for line in descriptions
1377                    if line.startswith("CHK %s " % storage_index_s)]
1378        self.failUnlessEqual(len(matching), 10)
1379
1380    def _test_cli(self, res):
1381        # run various CLI commands (in a thread, since they use blocking
1382        # network calls)
1383
1384        private_uri = self._private_node.get_uri()
1385        client0_basedir = self.getdir("client0")
1386
1387        nodeargs = [
1388            "--node-directory", client0_basedir,
1389            ]
1390
1391        d = defer.succeed(None)
1392
1393        # for compatibility with earlier versions, private/root_dir.cap is
1394        # supposed to be treated as an alias named "tahoe:". Start by making
1395        # sure that works, before we add other aliases.
1396
1397        root_file = os.path.join(client0_basedir, "private", "root_dir.cap")
1398        f = open(root_file, "wb")
1399        f.write(private_uri)
1400        f.close()
1401
1402        @defer.inlineCallbacks
1403        def run(ignored, verb, *args, **kwargs):
1404            rc,out,err = yield run_cli(verb, *args, nodeargs=nodeargs, **kwargs)
1405            defer.returnValue((out,err))
1406
1407        def _check_ls(out_and_err, expected_children, unexpected_children=()):
1408            (out, err) = out_and_err
1409            self.failUnlessEqual(err, "")
1410            for s in expected_children:
1411                self.failUnless(s in out, (s,out))
1412            for s in unexpected_children:
1413                self.failIf(s in out, (s,out))
1414
1415        def _check_ls_root(out_and_err):
1416            (out, err) = out_and_err
1417            self.failUnless("personal" in out)
1418            self.failUnless("s2-ro" in out)
1419            self.failUnless("s2-rw" in out)
1420            self.failUnlessEqual(err, "")
1421
1422        # this should reference private_uri
1423        d.addCallback(run, "ls")
1424        d.addCallback(_check_ls, ["personal", "s2-ro", "s2-rw"])
1425
1426        d.addCallback(run, "list-aliases")
1427        def _check_aliases_1(out_and_err):
1428            (out, err) = out_and_err
1429            self.failUnlessEqual(err, "")
1430            self.failUnlessEqual(out.strip(" \n"), "tahoe: %s" % str(private_uri, "ascii"))
1431        d.addCallback(_check_aliases_1)
1432
1433        # now that that's out of the way, remove root_dir.cap and work with
1434        # new files
1435        d.addCallback(lambda res: os.unlink(root_file))
1436        d.addCallback(run, "list-aliases")
1437        def _check_aliases_2(out_and_err):
1438            (out, err) = out_and_err
1439            self.failUnlessEqual(err, "")
1440            self.failUnlessEqual(out, "")
1441        d.addCallback(_check_aliases_2)
1442
1443        d.addCallback(run, "mkdir")
1444        def _got_dir(out_and_err ):
1445            (out, err) = out_and_err
1446            self.failUnless(uri.from_string_dirnode(out.strip()))
1447            return out.strip()
1448        d.addCallback(_got_dir)
1449        d.addCallback(lambda newcap: run(None, "add-alias", "tahoe", newcap))
1450
1451        d.addCallback(run, "list-aliases")
1452        def _check_aliases_3(out_and_err):
1453            (out, err) = out_and_err
1454            self.failUnlessEqual(err, "")
1455            self.failUnless("tahoe: " in out)
1456        d.addCallback(_check_aliases_3)
1457
1458        def _check_empty_dir(out_and_err):
1459            (out, err) = out_and_err
1460            self.failUnlessEqual(out, "")
1461            self.failUnlessEqual(err, "")
1462        d.addCallback(run, "ls")
1463        d.addCallback(_check_empty_dir)
1464
1465        def _check_missing_dir(out_and_err):
1466            # TODO: check that rc==2
1467            (out, err) = out_and_err
1468            self.failUnlessEqual(out, "")
1469            self.failUnlessEqual(err, "No such file or directory\n")
1470        d.addCallback(run, "ls", "bogus")
1471        d.addCallback(_check_missing_dir)
1472
1473        files = []
1474        datas = []
1475        for i in range(10):
1476            fn = os.path.join(self.basedir, "file%d" % i)
1477            files.append(fn)
1478            data = b"data to be uploaded: file%d\n" % i
1479            datas.append(data)
1480            with open(fn, "wb") as f:
1481                f.write(data)
1482
1483        def _check_stdout_against(out_and_err, filenum=None, data=None):
1484            (out, err) = out_and_err
1485            self.failUnlessEqual(err, "")
1486            if filenum is not None:
1487                self.failUnlessEqual(out, str(datas[filenum], "ascii"))
1488            if data is not None:
1489                self.failUnlessEqual(out, data)
1490
1491        # test all both forms of put: from a file, and from stdin
1492        #  tahoe put bar FOO
1493        d.addCallback(run, "put", files[0], "tahoe-file0")
1494        def _put_out(out_and_err):
1495            (out, err) = out_and_err
1496            self.failUnless("URI:LIT:" in out, out)
1497            self.failUnless("201 Created" in err, err)
1498            uri0 = out.strip()
1499            return run(None, "get", uri0)
1500        d.addCallback(_put_out)
1501        d.addCallback(lambda out_err: self.failUnlessEqual(out_err[0], str(datas[0], "ascii")))
1502
1503        d.addCallback(run, "put", files[1], "subdir/tahoe-file1")
1504        #  tahoe put bar tahoe:FOO
1505        d.addCallback(run, "put", files[2], "tahoe:file2")
1506        d.addCallback(run, "put", "--format=SDMF", files[3], "tahoe:file3")
1507        def _check_put_mutable(out_and_err):
1508            (out, err) = out_and_err
1509            self._mutable_file3_uri = out.strip()
1510        d.addCallback(_check_put_mutable)
1511        d.addCallback(run, "get", "tahoe:file3")
1512        d.addCallback(_check_stdout_against, 3)
1513
1514        #  tahoe put FOO
1515        STDIN_DATA = "This is the file to upload from stdin."
1516        d.addCallback(run, "put", "-", "tahoe-file-stdin", stdin=STDIN_DATA)
1517        #  tahoe put tahoe:FOO
1518        d.addCallback(run, "put", "-", "tahoe:from-stdin",
1519                      stdin="Other file from stdin.")
1520
1521        d.addCallback(run, "ls")
1522        d.addCallback(_check_ls, ["tahoe-file0", "file2", "file3", "subdir",
1523                                  "tahoe-file-stdin", "from-stdin"])
1524        d.addCallback(run, "ls", "subdir")
1525        d.addCallback(_check_ls, ["tahoe-file1"])
1526
1527        # tahoe mkdir FOO
1528        d.addCallback(run, "mkdir", "subdir2")
1529        d.addCallback(run, "ls")
1530        # TODO: extract the URI, set an alias with it
1531        d.addCallback(_check_ls, ["subdir2"])
1532
1533        # tahoe get: (to stdin and to a file)
1534        d.addCallback(run, "get", "tahoe-file0")
1535        d.addCallback(_check_stdout_against, 0)
1536        d.addCallback(run, "get", "tahoe:subdir/tahoe-file1")
1537        d.addCallback(_check_stdout_against, 1)
1538        outfile0 = os.path.join(self.basedir, "outfile0")
1539        d.addCallback(run, "get", "file2", outfile0)
1540        def _check_outfile0(out_and_err):
1541            (out, err) = out_and_err
1542            data = open(outfile0,"rb").read()
1543            self.failUnlessEqual(data, b"data to be uploaded: file2\n")
1544        d.addCallback(_check_outfile0)
1545        outfile1 = os.path.join(self.basedir, "outfile0")
1546        d.addCallback(run, "get", "tahoe:subdir/tahoe-file1", outfile1)
1547        def _check_outfile1(out_and_err):
1548            (out, err) = out_and_err
1549            data = open(outfile1,"rb").read()
1550            self.failUnlessEqual(data, b"data to be uploaded: file1\n")
1551        d.addCallback(_check_outfile1)
1552
1553        d.addCallback(run, "unlink", "tahoe-file0")
1554        d.addCallback(run, "unlink", "tahoe:file2")
1555        d.addCallback(run, "ls")
1556        d.addCallback(_check_ls, [], ["tahoe-file0", "file2"])
1557
1558        d.addCallback(run, "ls", "-l")
1559        def _check_ls_l(out_and_err):
1560            (out, err) = out_and_err
1561            lines = out.split("\n")
1562            for l in lines:
1563                if "tahoe-file-stdin" in l:
1564                    self.failUnless(l.startswith("-r-- "), l)
1565                    self.failUnless(" %d " % len(STDIN_DATA) in l)
1566                if "file3" in l:
1567                    self.failUnless(l.startswith("-rw- "), l) # mutable
1568        d.addCallback(_check_ls_l)
1569
1570        d.addCallback(run, "ls", "--uri")
1571        def _check_ls_uri(out_and_err):
1572            (out, err) = out_and_err
1573            lines = out.split("\n")
1574            for l in lines:
1575                if "file3" in l:
1576                    self.failUnless(self._mutable_file3_uri in l)
1577        d.addCallback(_check_ls_uri)
1578
1579        d.addCallback(run, "ls", "--readonly-uri")
1580        def _check_ls_rouri(out_and_err):
1581            (out, err) = out_and_err
1582            lines = out.split("\n")
1583            for l in lines:
1584                if "file3" in l:
1585                    rw_uri = self._mutable_file3_uri
1586                    u = uri.from_string_mutable_filenode(rw_uri)
1587                    ro_uri = str(u.get_readonly().to_string(), "ascii")
1588                    self.failUnless(ro_uri in l)
1589        d.addCallback(_check_ls_rouri)
1590
1591
1592        d.addCallback(run, "mv", "tahoe-file-stdin", "tahoe-moved")
1593        d.addCallback(run, "ls")
1594        d.addCallback(_check_ls, ["tahoe-moved"], ["tahoe-file-stdin"])
1595
1596        d.addCallback(run, "ln", "tahoe-moved", "newlink")
1597        d.addCallback(run, "ls")
1598        d.addCallback(_check_ls, ["tahoe-moved", "newlink"])
1599
1600        d.addCallback(run, "cp", "tahoe:file3", "tahoe:file3-copy")
1601        d.addCallback(run, "ls")
1602        d.addCallback(_check_ls, ["file3", "file3-copy"])
1603        d.addCallback(run, "get", "tahoe:file3-copy")
1604        d.addCallback(_check_stdout_against, 3)
1605
1606        # copy from disk into tahoe
1607        d.addCallback(run, "cp", files[4], "tahoe:file4")
1608        d.addCallback(run, "ls")
1609        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1610        d.addCallback(run, "get", "tahoe:file4")
1611        d.addCallback(_check_stdout_against, 4)
1612
1613        # copy from tahoe into disk
1614        target_filename = os.path.join(self.basedir, "file-out")
1615        d.addCallback(run, "cp", "tahoe:file4", target_filename)
1616        def _check_cp_out(out_and_err):
1617            (out, err) = out_and_err
1618            self.failUnless(os.path.exists(target_filename))
1619            got = open(target_filename,"rb").read()
1620            self.failUnlessEqual(got, datas[4])
1621        d.addCallback(_check_cp_out)
1622
1623        # copy from disk to disk (silly case)
1624        target2_filename = os.path.join(self.basedir, "file-out-copy")
1625        d.addCallback(run, "cp", target_filename, target2_filename)
1626        def _check_cp_out2(out_and_err):
1627            (out, err) = out_and_err
1628            self.failUnless(os.path.exists(target2_filename))
1629            got = open(target2_filename,"rb").read()
1630            self.failUnlessEqual(got, datas[4])
1631        d.addCallback(_check_cp_out2)
1632
1633        # copy from tahoe into disk, overwriting an existing file
1634        d.addCallback(run, "cp", "tahoe:file3", target_filename)
1635        def _check_cp_out3(out_and_err):
1636            (out, err) = out_and_err
1637            self.failUnless(os.path.exists(target_filename))
1638            got = open(target_filename,"rb").read()
1639            self.failUnlessEqual(got, datas[3])
1640        d.addCallback(_check_cp_out3)
1641
1642        # copy from disk into tahoe, overwriting an existing immutable file
1643        d.addCallback(run, "cp", files[5], "tahoe:file4")
1644        d.addCallback(run, "ls")
1645        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1646        d.addCallback(run, "get", "tahoe:file4")
1647        d.addCallback(_check_stdout_against, 5)
1648
1649        # copy from disk into tahoe, overwriting an existing mutable file
1650        d.addCallback(run, "cp", files[5], "tahoe:file3")
1651        d.addCallback(run, "ls")
1652        d.addCallback(_check_ls, ["file3", "file3-copy", "file4"])
1653        d.addCallback(run, "get", "tahoe:file3")
1654        d.addCallback(_check_stdout_against, 5)
1655
1656        # recursive copy: setup
1657        dn = os.path.join(self.basedir, "dir1")
1658        os.makedirs(dn)
1659        with open(os.path.join(dn, "rfile1"), "wb") as f:
1660            f.write(b"rfile1")
1661        with open(os.path.join(dn, "rfile2"), "wb") as f:
1662            f.write(b"rfile2")
1663        with open(os.path.join(dn, "rfile3"), "wb") as f:
1664            f.write(b"rfile3")
1665        sdn2 = os.path.join(dn, "subdir2")
1666        os.makedirs(sdn2)
1667        with open(os.path.join(sdn2, "rfile4"), "wb") as f:
1668            f.write(b"rfile4")
1669        with open(os.path.join(sdn2, "rfile5"), "wb") as f:
1670            f.write(b"rfile5")
1671
1672        # from disk into tahoe
1673        d.addCallback(run, "cp", "-r", dn, "tahoe:")
1674        d.addCallback(run, "ls")
1675        d.addCallback(_check_ls, ["dir1"])
1676        d.addCallback(run, "ls", "dir1")
1677        d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1678                      ["rfile4", "rfile5"])
1679        d.addCallback(run, "ls", "tahoe:dir1/subdir2")
1680        d.addCallback(_check_ls, ["rfile4", "rfile5"],
1681                      ["rfile1", "rfile2", "rfile3"])
1682        d.addCallback(run, "get", "dir1/subdir2/rfile4")
1683        d.addCallback(_check_stdout_against, data="rfile4")
1684
1685        # and back out again
1686        dn_copy = os.path.join(self.basedir, "dir1-copy")
1687        d.addCallback(run, "cp", "--verbose", "-r", "tahoe:dir1", dn_copy)
1688        def _check_cp_r_out(out_and_err):
1689            (out, err) = out_and_err
1690            def _cmp(name):
1691                old = open(os.path.join(dn, name), "rb").read()
1692                newfn = os.path.join(dn_copy, "dir1", name)
1693                self.failUnless(os.path.exists(newfn))
1694                new = open(newfn, "rb").read()
1695                self.failUnlessEqual(old, new)
1696            _cmp("rfile1")
1697            _cmp("rfile2")
1698            _cmp("rfile3")
1699            _cmp(os.path.join("subdir2", "rfile4"))
1700            _cmp(os.path.join("subdir2", "rfile5"))
1701        d.addCallback(_check_cp_r_out)
1702
1703        # and copy it a second time, which ought to overwrite the same files
1704        d.addCallback(run, "cp", "-r", "tahoe:dir1", dn_copy)
1705
1706        # and again, only writing filecaps
1707        dn_copy2 = os.path.join(self.basedir, "dir1-copy-capsonly")
1708        d.addCallback(run, "cp", "-r", "--caps-only", "tahoe:dir1", dn_copy2)
1709        def _check_capsonly(out_and_err):
1710            # these should all be LITs
1711            (out, err) = out_and_err
1712            x = open(os.path.join(dn_copy2, "dir1", "subdir2", "rfile4")).read()
1713            y = uri.from_string_filenode(x)
1714            self.failUnlessEqual(y.data, b"rfile4")
1715        d.addCallback(_check_capsonly)
1716
1717        # and tahoe-to-tahoe
1718        d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1719        d.addCallback(run, "ls")
1720        d.addCallback(_check_ls, ["dir1", "dir1-copy"])
1721        d.addCallback(run, "ls", "dir1-copy/dir1")
1722        d.addCallback(_check_ls, ["rfile1", "rfile2", "rfile3", "subdir2"],
1723                      ["rfile4", "rfile5"])
1724        d.addCallback(run, "ls", "tahoe:dir1-copy/dir1/subdir2")
1725        d.addCallback(_check_ls, ["rfile4", "rfile5"],
1726                      ["rfile1", "rfile2", "rfile3"])
1727        d.addCallback(run, "get", "dir1-copy/dir1/subdir2/rfile4")
1728        d.addCallback(_check_stdout_against, data="rfile4")
1729
1730        # and copy it a second time, which ought to overwrite the same files
1731        d.addCallback(run, "cp", "-r", "tahoe:dir1", "tahoe:dir1-copy")
1732
1733        # tahoe_ls doesn't currently handle the error correctly: it tries to
1734        # JSON-parse a traceback.
1735##         def _ls_missing(res):
1736##             argv = nodeargs + ["ls", "bogus"]
1737##             return self._run_cli(argv)
1738##         d.addCallback(_ls_missing)
1739##         def _check_ls_missing((out,err)):
1740##             print("OUT", out)
1741##             print("ERR", err)
1742##             self.failUnlessEqual(err, "")
1743##         d.addCallback(_check_ls_missing)
1744
1745        return d
1746
1747    # In CI this test can be very slow, so give it a longer timeout:
1748    test_filesystem.timeout = 360  # type: ignore[attr-defined]
1749
1750
1751    def test_filesystem_with_cli_in_subprocess(self):
1752        # We do this in a separate test so that test_filesystem doesn't skip if we can't run bin/tahoe.
1753
1754        d = self.set_up_nodes()
1755        def _new_happy_semantics(ign):
1756            for c in self.clients:
1757                c.encoding_params['happy'] = 1
1758        d.addCallback(_new_happy_semantics)
1759
1760        def _run_in_subprocess(ignored, verb, *args, **kwargs):
1761            stdin = kwargs.get("stdin")
1762            # XXX https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3548
1763            env = kwargs.get("env", os.environ)
1764            # Python warnings from the child process don't matter.
1765            env["PYTHONWARNINGS"] = "ignore"
1766            newargs = ["--node-directory", self.getdir("client0"), verb] + list(args)
1767            return self.run_bintahoe(newargs, stdin=stdin, env=env)
1768
1769        def _check_succeeded(res):
1770            out, err, rc_or_sig = res
1771            self.failUnlessEqual(rc_or_sig, 0, str(res))
1772
1773        d.addCallback(_run_in_subprocess, "create-alias", "newalias")
1774        d.addCallback(_check_succeeded)
1775
1776        STDIN_DATA = b"This is the file to upload from stdin."
1777        d.addCallback(_run_in_subprocess, "put", "-", "newalias:tahoe-file", stdin=STDIN_DATA)
1778        d.addCallback(_check_succeeded)
1779
1780        def _mv_with_http_proxy(ign):
1781            env = os.environ
1782            env['http_proxy'] = env['HTTP_PROXY'] = "http://127.0.0.0:12345"  # invalid address
1783            return _run_in_subprocess(None, "mv", "newalias:tahoe-file", "newalias:tahoe-moved", env=env)
1784        d.addCallback(_mv_with_http_proxy)
1785        d.addCallback(_check_succeeded)
1786
1787        d.addCallback(_run_in_subprocess, "ls", "newalias:")
1788        def _check_ls(res):
1789            out, err, rc_or_sig = res
1790            self.failUnlessEqual(rc_or_sig, 0, str(res))
1791            self.failUnlessIn(b"tahoe-moved", out)
1792            self.failIfIn(b"tahoe-file", out)
1793        d.addCallback(_check_ls)
1794        return d
1795
1796    def _test_checker(self, res):
1797        ut = upload.Data(b"too big to be literal" * 200, convergence=None)
1798        d = self._personal_node.add_file(u"big file", ut)
1799
1800        d.addCallback(lambda res: self._personal_node.check(Monitor()))
1801        def _check_dirnode_results(r):
1802            self.failUnless(r.is_healthy())
1803        d.addCallback(_check_dirnode_results)
1804        d.addCallback(lambda res: self._personal_node.check(Monitor(), verify=True))
1805        d.addCallback(_check_dirnode_results)
1806
1807        d.addCallback(lambda res: self._personal_node.get(u"big file"))
1808        def _got_chk_filenode(n):
1809            self.failUnless(isinstance(n, ImmutableFileNode))
1810            d = n.check(Monitor())
1811            def _check_filenode_results(r):
1812                self.failUnless(r.is_healthy())
1813            d.addCallback(_check_filenode_results)
1814            d.addCallback(lambda res: n.check(Monitor(), verify=True))
1815            d.addCallback(_check_filenode_results)
1816            return d
1817        d.addCallback(_got_chk_filenode)
1818
1819        d.addCallback(lambda res: self._personal_node.get(u"sekrit data"))
1820        def _got_lit_filenode(n):
1821            self.failUnless(isinstance(n, LiteralFileNode))
1822            d = n.check(Monitor())
1823            def _check_lit_filenode_results(r):
1824                self.failUnlessEqual(r, None)
1825            d.addCallback(_check_lit_filenode_results)
1826            d.addCallback(lambda res: n.check(Monitor(), verify=True))
1827            d.addCallback(_check_lit_filenode_results)
1828            return d
1829        d.addCallback(_got_lit_filenode)
1830        return d
1831
1832
1833class Connections(SystemTestMixin, unittest.TestCase):
1834    FORCE_FOOLSCAP_FOR_STORAGE = True
1835
1836    def test_rref(self):
1837        # The way the listening port is created is via
1838        # SameProcessStreamEndpointAssigner (allmydata.test.common), which then
1839        # makes an endpoint string parsed by AdoptedServerPort. The latter does
1840        # dup(fd), which results in the filedescriptor staying alive _until the
1841        # test ends_. That means that when we disown the service, we still have
1842        # the listening port there on the OS level! Just the resulting
1843        # connections aren't handled. So this test relies on aggressive
1844        # timeouts in the HTTP client and presumably some equivalent in
1845        # Foolscap, since connection refused does _not_ happen.
1846        self.basedir = "system/Connections/rref-foolscap-{}".format(
1847            self.FORCE_FOOLSCAP_FOR_STORAGE
1848        )
1849        d = self.set_up_nodes(2)
1850        def _start(ign):
1851            self.c0 = self.clients[0]
1852            nonclients = [s for s in self.c0.storage_broker.get_connected_servers()
1853                          if s.get_serverid() != self.c0.get_long_nodeid()]
1854            self.failUnlessEqual(len(nonclients), 1)
1855
1856            self.s1 = nonclients[0]  # s1 is the server, not c0
1857            self.s1_storage_server = self.s1.get_storage_server()
1858            self.assertIsNot(self.s1_storage_server, None)
1859            self.assertTrue(self.s1.is_connected())
1860        d.addCallback(_start)
1861
1862        # now shut down the server
1863        d.addCallback(lambda ign: self.clients[1].disownServiceParent())
1864
1865        # kill any persistent http connections that might continue to work
1866        d.addCallback(lambda ign: self.close_idle_http_connections())
1867
1868        # and wait for the client to notice
1869        def _poll():
1870            return len(self.c0.storage_broker.get_connected_servers()) == 1
1871        d.addCallback(lambda ign: self.poll(_poll))
1872
1873        def _down(ign):
1874            self.assertFalse(self.s1.is_connected())
1875            storage_server = self.s1.get_storage_server()
1876            self.assertIsNot(storage_server, None)
1877            self.assertEqual(storage_server, self.s1_storage_server)
1878        d.addCallback(_down)
1879        return d
1880
1881
1882class HTTPSystemTest(SystemTest):
1883    """HTTP storage protocol variant of the system tests."""
1884
1885    FORCE_FOOLSCAP_FOR_STORAGE = False
1886
1887
1888
1889class HTTPConnections(Connections):
1890    """HTTP storage protocol variant of the connections tests."""
1891    FORCE_FOOLSCAP_FOR_STORAGE = False
1892
Note: See TracBrowser for help on using the repository browser.