""" Tests for ``allmydata.scripts.tahoe_run``. """ from __future__ import annotations import re from io import ( StringIO, ) from hypothesis.strategies import text from hypothesis import given, assume from testtools.matchers import ( Contains, Equals, ) from twisted.python.filepath import ( FilePath, ) from twisted.internet.testing import ( MemoryReactor, ) from twisted.python.failure import ( Failure, ) from twisted.internet.error import ( ConnectionDone, ) from twisted.internet.test.modulehelpers import ( AlternateReactor, ) from ...scripts.tahoe_run import ( DaemonizeTheRealService, RunOptions, run, ) from ...util.pid import ( check_pid_process, InvalidPidFile, ) from ...scripts.runner import ( parse_options ) from ..common import ( SyncTestCase, ) class DaemonizeTheRealServiceTests(SyncTestCase): """ Tests for ``DaemonizeTheRealService``. """ def _verify_error(self, config, expected): """ Assert that when ``DaemonizeTheRealService`` is started using the given configuration it writes the given message to stderr and stops the reactor. :param bytes config: The contents of a ``tahoe.cfg`` file to give to the service. :param bytes expected: A string to assert appears in stderr after the service starts. """ nodedir = FilePath(self.mktemp()) nodedir.makedirs() nodedir.child("tahoe.cfg").setContent(config.encode("ascii")) nodedir.child("tahoe-client.tac").touch() options = parse_options(["run", nodedir.path]) stdout = options.stdout = StringIO() stderr = options.stderr = StringIO() run_options = options.subOptions reactor = MemoryReactor() with AlternateReactor(reactor): service = DaemonizeTheRealService( "client", nodedir.path, run_options, ) service.startService() # We happen to know that the service uses reactor.callWhenRunning # to schedule all its work (though I couldn't tell you *why*). # Make sure those scheduled calls happen. waiting = reactor.whenRunningHooks[:] del reactor.whenRunningHooks[:] for f, a, k in waiting: f(*a, **k) self.assertThat( reactor.hasStopped, Equals(True), ) self.assertThat( stdout.getvalue(), Equals(""), ) self.assertThat( stderr.getvalue(), Contains(expected), ) def test_unknown_config(self): """ If there are unknown items in the node configuration file then a short message introduced with ``"Configuration error:"`` is written to stderr. """ self._verify_error("[invalid-section]\n", "Configuration error:") def test_port_assignment_required(self): """ If ``tub.port`` is configured to use port 0 then a short message rejecting this configuration is written to stderr. """ self._verify_error( """ [node] tub.port = 0 """, "tub.port cannot be 0", ) def test_privacy_error(self): """ If ``reveal-IP-address`` is set to false and the tub is not configured in a way that avoids revealing the node's IP address, a short message about privacy is written to stderr. """ self._verify_error( """ [node] tub.port = AUTO reveal-IP-address = false """, "Privacy requested", ) class DaemonizeStopTests(SyncTestCase): """ Tests relating to stopping the daemon """ def setUp(self): self.nodedir = FilePath(self.mktemp()) self.nodedir.makedirs() config = "" self.nodedir.child("tahoe.cfg").setContent(config.encode("ascii")) self.nodedir.child("tahoe-client.tac").touch() # arrange to know when reactor.stop() is called self.reactor = MemoryReactor() self.stop_calls = [] def record_stop(): self.stop_calls.append(object()) self.reactor.stop = record_stop super().setUp() def _make_daemon(self, extra_argv: list[str]) -> DaemonizeTheRealService: """ Create the daemonization service. :param extra_argv: Extra arguments to pass between ``run`` and the node path. """ options = parse_options(["run"] + extra_argv + [self.nodedir.path]) options.stdout = StringIO() options.stderr = StringIO() options.stdin = StringIO() run_options = options.subOptions return DaemonizeTheRealService( "client", self.nodedir.path, run_options, ) def _run_daemon(self) -> None: """ Simulate starting up the reactor so the daemon plugin can do its stuff. """ # We happen to know that the service uses reactor.callWhenRunning # to schedule all its work (though I couldn't tell you *why*). # Make sure those scheduled calls happen. waiting = self.reactor.whenRunningHooks[:] del self.reactor.whenRunningHooks[:] for f, a, k in waiting: f(*a, **k) def _close_stdin(self) -> None: """ Simulate closing the daemon plugin's stdin. """ # there should be a single reader: our StandardIO process # reader for stdin. Simulate it closing. for r in self.reactor.getReaders(): r.connectionLost(Failure(ConnectionDone())) def test_stop_on_stdin_close(self): """ We stop when stdin is closed. """ with AlternateReactor(self.reactor): service = self._make_daemon([]) service.startService() self._run_daemon() self._close_stdin() self.assertEqual(len(self.stop_calls), 1) def test_allow_stdin_close(self): """ If --allow-stdin-close is specified then closing stdin doesn't stop the process """ with AlternateReactor(self.reactor): service = self._make_daemon(["--allow-stdin-close"]) service.startService() self._run_daemon() self._close_stdin() self.assertEqual(self.stop_calls, []) class RunTests(SyncTestCase): """ Tests for ``run``. """ def test_non_numeric_pid(self): """ If the pidfile exists but does not contain a numeric value, a complaint to this effect is written to stderr. """ basedir = FilePath(self.mktemp()).asTextMode() basedir.makedirs() basedir.child(u"running.process").setContent(b"foo") basedir.child(u"tahoe-client.tac").setContent(b"") config = RunOptions() config.stdout = StringIO() config.stderr = StringIO() config['basedir'] = basedir.path config.twistd_args = [] reactor = MemoryReactor() runs = [] result_code = run(reactor, config, runApp=runs.append) self.assertThat( config.stderr.getvalue(), Contains("found invalid PID file in"), ) # because the pidfile is invalid we shouldn't get to the # .run() call itself. self.assertThat(runs, Equals([])) self.assertThat(result_code, Equals(1)) good_file_content_re = re.compile(r"\s*[0-9]*\s[0-9]*\s*", re.M) @given(text()) def test_pidfile_contents(self, content): """ invalid contents for a pidfile raise errors """ assume(not self.good_file_content_re.match(content)) pidfile = FilePath("pidfile") pidfile.setContent(content.encode("utf8")) with self.assertRaises(InvalidPidFile): with check_pid_process(pidfile): pass