"""
Ported to Python 3.
"""

import os
from twisted.trial import unittest
from twisted.internet import defer, error
from twisted.python.usage import UsageError
from io import StringIO
from unittest import mock
from ..util import i2p_provider
from ..scripts import create_node, runner

def mock_txi2p(txi2p):
    return mock.patch("allmydata.util.i2p_provider._import_txi2p",
                      return_value=txi2p)

def mock_i2p(i2p):
    return mock.patch("allmydata.util.i2p_provider._import_i2p",
                      return_value=i2p)

def make_cli_config(basedir, *argv):
    parent = runner.Options()
    cli_config = create_node.CreateNodeOptions()
    cli_config.parent = parent
    cli_config.parseOptions(argv)
    cli_config["basedir"] = basedir
    cli_config.stdout = StringIO()
    return cli_config

class TryToConnect(unittest.TestCase):
    def test_try(self):
        reactor = object()
        txi2p = mock.Mock()
        d = defer.succeed(True)
        txi2p.testAPI = mock.Mock(return_value=d)
        ep = object()
        stdout = StringIO()
        with mock.patch("allmydata.util.i2p_provider.clientFromString",
                        return_value=ep) as cfs:
            d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p)
        r = self.successResultOf(d)
        self.assertTrue(r)
        cfs.assert_called_with(reactor, "desc")
        txi2p.testAPI.assert_called_with(reactor, 'SAM', ep)

    def test_try_handled_error(self):
        reactor = object()
        txi2p = mock.Mock()
        d = defer.fail(error.ConnectError("oops"))
        txi2p.testAPI = mock.Mock(return_value=d)
        ep = object()
        stdout = StringIO()
        with mock.patch("allmydata.util.i2p_provider.clientFromString",
                        return_value=ep) as cfs:
            d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p)
        r = self.successResultOf(d)
        self.assertIs(r, None)
        cfs.assert_called_with(reactor, "desc")
        txi2p.testAPI.assert_called_with(reactor, 'SAM', ep)
        self.assertEqual(stdout.getvalue(),
                         "Unable to reach I2P SAM API at 'desc': "
                         "An error occurred while connecting: oops.\n")

    def test_try_unhandled_error(self):
        reactor = object()
        txi2p = mock.Mock()
        d = defer.fail(ValueError("oops"))
        txi2p.testAPI = mock.Mock(return_value=d)
        ep = object()
        stdout = StringIO()
        with mock.patch("allmydata.util.i2p_provider.clientFromString",
                        return_value=ep) as cfs:
            d = i2p_provider._try_to_connect(reactor, "desc", stdout, txi2p)
        f = self.failureResultOf(d)
        self.assertIsInstance(f.value, ValueError)
        self.assertEqual(str(f.value), "oops")
        cfs.assert_called_with(reactor, "desc")
        txi2p.testAPI.assert_called_with(reactor, 'SAM', ep)
        self.assertEqual(stdout.getvalue(), "")

class ConnectToI2P(unittest.TestCase):
    def _do_test_connect(self, endpoint, reachable):
        reactor = object()
        txi2p = object()
        args = []
        if endpoint:
            args = ["--i2p-sam-port=%s" % endpoint]
        cli_config = make_cli_config("basedir", "--listen=i2p", *args)
        stdout = cli_config.stdout
        expected_port = "tcp:127.0.0.1:7656"
        if endpoint:
            expected_port = endpoint
        tried = []
        def _try_to_connect(reactor, port, stdout, txi2p):
            tried.append( (reactor, port, stdout, txi2p) )
            if not reachable:
                return defer.succeed(None)
            if port == expected_port:
                return defer.succeed(True)
            return defer.succeed(None)

        with mock.patch("allmydata.util.i2p_provider._try_to_connect",
                        _try_to_connect):
            d = i2p_provider._connect_to_i2p(reactor, cli_config, txi2p)
        if not reachable:
            f = self.failureResultOf(d)
            self.assertIsInstance(f.value, ValueError)
            self.assertEqual(str(f.value),
                             "unable to reach any default I2P SAM port")
            return
        successful_port = self.successResultOf(d)
        self.assertEqual(successful_port, expected_port)
        expected = [(reactor, "tcp:127.0.0.1:7656", stdout, txi2p)]
        if endpoint:
            expected = [(reactor, endpoint, stdout, txi2p)]
        self.assertEqual(tried, expected)

    def test_connect(self):
        return self._do_test_connect(None, True)
    def test_connect_endpoint(self):
        return self._do_test_connect("tcp:other:port", True)
    def test_connect_unreachable(self):
        return self._do_test_connect(None, False)


class CreateDest(unittest.TestCase):
    def test_no_txi2p(self):
        with mock.patch("allmydata.util.i2p_provider._import_txi2p",
                        return_value=None):
            d = i2p_provider.create_config("reactor", "cli_config")
            f = self.failureResultOf(d)
            self.assertIsInstance(f.value, ValueError)
            self.assertEqual(str(f.value),
                             "Cannot create I2P Destination without txi2p. "
                             "Please 'pip install tahoe-lafs[i2p]' to fix this.")

    def _do_test_launch(self, executable):
        basedir = self.mktemp()
        os.mkdir(basedir)
        args = ["--listen=i2p", "--i2p-launch"]
        if executable:
            args.append("--i2p-executable=%s" % executable)
        self.assertRaises(UsageError, make_cli_config, basedir, *args)

    def test_launch(self):
        return self._do_test_launch(None)
    def test_launch_executable(self):
        return self._do_test_launch("myi2p")

    def test_sam_endpoint(self):
        basedir = self.mktemp()
        os.mkdir(basedir)
        private_dir = os.path.join(basedir, "private")
        os.mkdir(private_dir)
        privkeyfile = os.path.abspath(os.path.join(private_dir, "i2p_dest.privkey"))
        reactor = object()
        cli_config = make_cli_config(basedir, "--listen=i2p")
        connect_to_i2p = mock.Mock(return_value=defer.succeed("goodport"))
        txi2p = mock.Mock()
        ep = object()
        dest = mock.Mock()
        dest.host = "FOOBAR.b32.i2p"
        txi2p.generateDestination = mock.Mock(return_value=defer.succeed(dest))

        with mock_txi2p(txi2p):
            with mock.patch("allmydata.util.i2p_provider._connect_to_i2p",
                            connect_to_i2p):
                with mock.patch("allmydata.util.i2p_provider.clientFromString",
                                return_value=ep) as cfs:
                    d = i2p_provider.create_config(reactor, cli_config)
                    i2p_config = self.successResultOf(d)

        connect_to_i2p.assert_called_with(reactor, cli_config, txi2p)
        cfs.assert_called_with(reactor, "goodport")
        txi2p.generateDestination.assert_called_with(reactor, privkeyfile, 'SAM', ep)

        expected = {"sam.port": "goodport",
                    "dest": "true",
                    "dest.port": "3457",
                    "dest.private_key_file": os.path.join("private",
                                                          "i2p_dest.privkey"),
                    }
        self.assertEqual(dict(i2p_config.node_config["i2p"]), expected)
        self.assertEqual(i2p_config.tub_ports, ["listen:i2p"])
        self.assertEqual(i2p_config.tub_locations, ["i2p:FOOBAR.b32.i2p:3457"])

_None = object()
class FakeConfig(dict):
    def get_config(self, section, option, default=_None, boolean=False):
        if section != "i2p":
            raise ValueError(section)
        value = self.get(option, default)
        if value is _None:
            raise KeyError
        return value

class Provider(unittest.TestCase):
    def test_build(self):
        i2p_provider.create("reactor", FakeConfig())

    def test_handler_disabled(self):
        p = i2p_provider.create("reactor", FakeConfig(enabled=False))
        self.assertEqual(p.get_i2p_handler(), None)

    def test_handler_no_i2p(self):
        with mock_i2p(None):
            p = i2p_provider.create("reactor", FakeConfig())
        self.assertEqual(p.get_i2p_handler(), None)

    def test_handler_sam_endpoint(self):
        i2p = mock.Mock()
        handler = object()
        i2p.sam_endpoint = mock.Mock(return_value=handler)
        ep = object()
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(**{"sam.port": "ep_desc"}))
            with mock.patch("allmydata.util.i2p_provider.clientFromString",
                            return_value=ep) as cfs:
                h = p.get_i2p_handler()
        cfs.assert_called_with(reactor, "ep_desc")
        self.assertIs(h, handler)
        i2p.sam_endpoint.assert_called_with(ep, keyfile=None)

    def test_handler_launch(self):
        i2p = mock.Mock()
        handler = object()
        i2p.launch = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(launch=True))
        h = p.get_i2p_handler()
        self.assertIs(h, handler)
        i2p.launch.assert_called_with(i2p_configdir=None, i2p_binary=None)

    def test_handler_launch_configdir(self):
        i2p = mock.Mock()
        handler = object()
        i2p.launch = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(launch=True,
                                               **{"i2p.configdir": "configdir"}))
        h = p.get_i2p_handler()
        self.assertIs(h, handler)
        i2p.launch.assert_called_with(i2p_configdir="configdir", i2p_binary=None)

    def test_handler_launch_configdir_executable(self):
        i2p = mock.Mock()
        handler = object()
        i2p.launch = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(launch=True,
                                               **{"i2p.configdir": "configdir",
                                                  "i2p.executable": "myi2p",
                                               }))
        h = p.get_i2p_handler()
        self.assertIs(h, handler)
        i2p.launch.assert_called_with(i2p_configdir="configdir", i2p_binary="myi2p")

    def test_handler_configdir(self):
        i2p = mock.Mock()
        handler = object()
        i2p.local_i2p = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(**{"i2p.configdir": "configdir"}))
        h = p.get_i2p_handler()
        i2p.local_i2p.assert_called_with("configdir")
        self.assertIs(h, handler)

    def test_handler_launch_executable(self):
        i2p = mock.Mock()
        handler = object()
        i2p.launch = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(launch=True,
                                               **{"i2p.executable": "myi2p"}))
        h = p.get_i2p_handler()
        self.assertIs(h, handler)
        i2p.launch.assert_called_with(i2p_configdir=None, i2p_binary="myi2p")

    def test_handler_default(self):
        i2p = mock.Mock()
        handler = object()
        i2p.default = mock.Mock(return_value=handler)
        reactor = object()

        with mock_i2p(i2p):
            p = i2p_provider.create(reactor, FakeConfig())
        h = p.get_i2p_handler()
        self.assertIs(h, handler)
        i2p.default.assert_called_with(reactor, keyfile=None)

class ProviderListener(unittest.TestCase):
    def test_listener(self):
        """Does the I2P Provider object's get_listener() method correctly
        convert the [i2p] section of tahoe.cfg into an
        endpoint/descriptor?
        """

        i2p = mock.Mock()
        handler = object()
        i2p.local_i2p = mock.Mock(return_value=handler)
        reactor = object()

        privkeyfile = os.path.join("private", "i2p_dest.privkey")
        with mock_i2p(i2p):
            p = i2p_provider.create(reactor,
                                    FakeConfig(**{
                                        "i2p.configdir": "configdir",
                                        "sam.port": "good:port",
                                        "dest": "true",
                                        "dest.port": "3457",
                                        "dest.private_key_file": privkeyfile,
                                    }))
            endpoint_or_description = p.get_listener()
        self.assertEqual(endpoint_or_description,
                         "i2p:%s:3457:api=SAM:apiEndpoint=good\\:port" % privkeyfile)

class Provider_CheckI2PConfig(unittest.TestCase):
    def test_default(self):
        # default config doesn't start an I2P service, so it should be
        # happy both with and without txi2p

        p = i2p_provider.create("reactor", FakeConfig())
        p.check_dest_config()

        with mock_txi2p(None):
            p = i2p_provider.create("reactor", FakeConfig())
            p.check_dest_config()

    def test_no_txi2p(self):
        with mock_txi2p(None):
            with self.assertRaises(ValueError) as ctx:
                i2p_provider.create("reactor", FakeConfig(dest=True))
            self.assertEqual(
                str(ctx.exception),
                "Cannot create I2P Destination without txi2p. "
                "Please 'pip install tahoe-lafs[i2p]' to fix."
            )

    def test_no_launch_no_control(self):
        with self.assertRaises(ValueError) as ctx:
            i2p_provider.create("reactor", FakeConfig(dest=True))
        self.assertEqual(
            str(ctx.exception),
            "[i2p] dest = true, but we have neither "
            "sam.port= nor launch=true nor configdir="
        )

    def test_missing_keys(self):
        with self.assertRaises(ValueError) as ctx:
            i2p_provider.create("reactor",
                                FakeConfig(
                                    dest=True,
                                    **{"sam.port": "x",
                                    }
                                ))
        self.assertEqual(str(ctx.exception), "[i2p] dest = true, "
                         "but dest.port= is missing")

        with self.assertRaises(ValueError) as ctx:
            i2p_provider.create("reactor",
                                FakeConfig(dest=True,
                                           **{"sam.port": "x",
                                              "dest.port": "y",
                                           }))
        self.assertEqual(
            str(ctx.exception),
            "[i2p] dest = true, "
            "but dest.private_key_file= is missing"
        )

    def test_launch_not_implemented(self):
        with self.assertRaises(NotImplementedError) as ctx:
            i2p_provider.create("reactor",
                                FakeConfig(dest=True, launch=True,
                                           **{"dest.port": "x",
                                               "dest.private_key_file": "y",
                                           }))
        self.assertEqual(
            str(ctx.exception),
            "[i2p] launch is under development."
        )

    def test_ok(self):
        i2p_provider.create(
            "reactor",
            FakeConfig(
                dest=True, **{
                    "sam.port": "x",
                    "dest.port": "y",
                    "dest.private_key_file": "z",
                }
            )
        )
