source: trunk/integration/util.py

Last change on this file was 9ee10af, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-09-05T16:03:50Z

Start on benchmarking infrastructure: a framework for starting nodes.

  • Property mode set to 100644
File size: 27.2 KB
Line 
1"""
2General functionality useful for the implementation of integration tests.
3"""
4
5from __future__ import annotations
6
7from contextlib import contextmanager
8from typing import Any
9from typing_extensions import Literal
10from tempfile import NamedTemporaryFile
11import sys
12import time
13import json
14from os import mkdir, environ
15from os.path import exists, join, basename
16from io import StringIO, BytesIO
17from subprocess import check_output
18
19from twisted.python.filepath import (
20    FilePath,
21)
22from twisted.internet.defer import Deferred, succeed
23from twisted.internet.protocol import ProcessProtocol
24from twisted.internet.error import ProcessExitedAlready, ProcessDone
25from twisted.internet.threads import deferToThread
26from twisted.internet.interfaces import IProcessTransport, IReactorProcess
27
28from attrs import frozen, evolve
29import requests
30
31from cryptography.hazmat.primitives.asymmetric import rsa
32from cryptography.hazmat.backends import default_backend
33from cryptography.hazmat.primitives.serialization import (
34    Encoding,
35    PrivateFormat,
36    NoEncryption,
37)
38
39from paramiko.rsakey import RSAKey
40from boltons.funcutils import wraps
41
42from allmydata.util import base32
43from allmydata.util.configutil import (
44    get_config,
45    set_config,
46    write_config,
47)
48from allmydata import client
49from allmydata.interfaces import DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE
50
51import pytest_twisted
52
53
54def block_with_timeout(deferred, reactor, timeout=120):
55    """Block until Deferred has result, but timeout instead of waiting forever."""
56    deferred.addTimeout(timeout, reactor)
57    return pytest_twisted.blockon(deferred)
58
59
60class _ProcessExitedProtocol(ProcessProtocol):
61    """
62    Internal helper that .callback()s on self.done when the process
63    exits (for any reason).
64    """
65
66    def __init__(self):
67        self.done = Deferred()
68
69    def processEnded(self, reason):
70        self.done.callback(None)
71
72
73class ProcessFailed(Exception):
74    """
75    A subprocess has failed.
76
77    :ivar ProcessTerminated reason: the original reason from .processExited
78
79    :ivar StringIO output: all stdout and stderr collected to this point.
80    """
81
82    def __init__(self, reason, output):
83        self.reason = reason
84        self.output = output
85
86    def __str__(self):
87        return "<ProcessFailed: {}>:\n{}".format(self.reason, self.output)
88
89
90class _CollectOutputProtocol(ProcessProtocol):
91    """
92    Internal helper. Collects all output (stdout + stderr) into
93    self.output, and callback's on done with all of it after the
94    process exits (for any reason).
95    """
96
97    def __init__(self, capture_stderr=True, stdin=None):
98        self.done = Deferred()
99        self.output = BytesIO()
100        self.capture_stderr = capture_stderr
101        self._stdin = stdin
102
103    def connectionMade(self):
104        if self._stdin is not None:
105            self.transport.write(self._stdin)
106            self.transport.closeStdin()
107
108    def processEnded(self, reason):
109        if not self.done.called:
110            self.done.callback(self.output.getvalue())
111
112    def processExited(self, reason):
113        if not isinstance(reason.value, ProcessDone):
114            self.done.errback(ProcessFailed(reason, self.output.getvalue()))
115
116    def outReceived(self, data):
117        self.output.write(data)
118
119    def errReceived(self, data):
120        if self.capture_stderr:
121            self.output.write(data)
122
123
124class _DumpOutputProtocol(ProcessProtocol):
125    """
126    Internal helper.
127    """
128    def __init__(self, f):
129        self.done = Deferred()
130        self._out = f if f is not None else sys.stdout
131
132    def processEnded(self, reason):
133        if not self.done.called:
134            self.done.callback(None)
135
136    def processExited(self, reason):
137        if not isinstance(reason.value, ProcessDone):
138            self.done.errback(reason)
139
140    def outReceived(self, data):
141        data = str(data, sys.stdout.encoding)
142        self._out.write(data)
143
144    def errReceived(self, data):
145        data = str(data, sys.stdout.encoding)
146        self._out.write(data)
147
148
149class _MagicTextProtocol(ProcessProtocol):
150    """
151    Internal helper. Monitors all stdout looking for a magic string,
152    and then .callback()s on self.done and .errback's if the process exits
153    """
154
155    def __init__(self, magic_text: str, name: str) -> None:
156        self.magic_seen = Deferred()
157        self.name = f"{name}: "
158        self.exited = Deferred()
159        self._magic_text = magic_text
160        self._output = StringIO()
161
162    def processEnded(self, reason):
163        self.exited.callback(None)
164
165    def outReceived(self, data):
166        data = str(data, sys.stdout.encoding)
167        for line in data.splitlines():
168            sys.stdout.write(self.name + line + "\n")
169        self._output.write(data)
170        if not self.magic_seen.called and self._magic_text in self._output.getvalue():
171            print("Saw '{}' in the logs".format(self._magic_text))
172            self.magic_seen.callback(self)
173
174    def errReceived(self, data):
175        data = str(data, sys.stderr.encoding)
176        for line in data.splitlines():
177            sys.stdout.write(self.name + line + "\n")
178
179
180def _cleanup_process_async(transport: IProcessTransport) -> None:
181    """
182    If the given process transport seems to still be associated with a
183    running process, send a SIGTERM to that process.
184
185    :param transport: The transport to use.
186
187    :raise: ``ValueError`` if ``allow_missing`` is ``False`` and the transport
188        has no process.
189    """
190    if transport.pid is None:
191        # in cases of "restart", we will have registered a finalizer
192        # that will kill the process -- but already explicitly killed
193        # it (and then ran again) due to the "restart". So, if the
194        # process is already killed, our job is done.
195        print("Process already cleaned up and that's okay.")
196        return
197    print("signaling {} with TERM".format(transport.pid))
198    try:
199        transport.signalProcess('TERM')
200    except ProcessExitedAlready:
201        # The transport object thought it still had a process but the real OS
202        # process has already exited.  That's fine.  We accomplished what we
203        # wanted to.
204        pass
205
206def _cleanup_tahoe_process(tahoe_transport, exited):
207    """
208    Terminate the given process with a kill signal (SIGTERM on POSIX,
209    TerminateProcess on Windows).
210
211    :param tahoe_transport: The `IProcessTransport` representing the process.
212    :param exited: A `Deferred` which fires when the process has exited.
213
214    :return: After the process has exited.
215    """
216    from twisted.internet import reactor
217    _cleanup_process_async(tahoe_transport)
218    print(f"signaled, blocking on exit {exited}")
219    block_with_timeout(exited, reactor)
220    print("exited, goodbye")
221
222
223def run_tahoe(reactor, request, *args, **kwargs):
224    """
225    Helper to run tahoe with optional coverage.
226
227    :returns: a Deferred that fires when the command is done (or a
228        ProcessFailed exception if it exits non-zero)
229    """
230    stdin = kwargs.get("stdin", None)
231    protocol = _CollectOutputProtocol(stdin=stdin)
232    process = _tahoe_runner_optional_coverage(protocol, reactor, request, args)
233    process.exited = protocol.done
234    return protocol.done
235
236
237def _tahoe_runner_optional_coverage(proto, reactor, request, other_args):
238    """
239    Internal helper. Calls spawnProcess with `-m
240    allmydata.scripts.runner` and `other_args`, optionally inserting a
241    `--coverage` option if the `request` indicates we should.
242    """
243    if request.config.getoption('coverage', False):
244        args = [sys.executable, '-b', '-m', 'coverage', 'run', '-m', 'allmydata.scripts.runner', '--coverage']
245    else:
246        args = [sys.executable, '-b', '-m', 'allmydata.scripts.runner']
247    args += other_args
248    return reactor.spawnProcess(
249        proto,
250        sys.executable,
251        args,
252        env=environ,
253    )
254
255
256class TahoeProcess(object):
257    """
258    A running Tahoe process, with associated information.
259    """
260
261    def __init__(self, process_transport, node_dir):
262        self._process_transport = process_transport  # IProcessTransport instance
263        self._node_dir = node_dir  # path
264
265    @property
266    def transport(self):
267        return self._process_transport
268
269    @property
270    def node_dir(self):
271        return self._node_dir
272
273    def get_config(self):
274        return client.read_config(
275            self._node_dir,
276            u"portnum",
277        )
278
279    def kill(self):
280        """
281        Kill the process, block until it's done.
282        Does nothing if the process is already stopped (or never started).
283        """
284        print(f"TahoeProcess.kill({self.transport.pid} / {self.node_dir})")
285        _cleanup_tahoe_process(self.transport, self.transport.exited)
286
287    def kill_async(self):
288        """
289        Kill the process, return a Deferred that fires when it's done.
290        Does nothing if the process is already stopped (or never started).
291        """
292        print(f"TahoeProcess.kill_async({self.transport.pid} / {self.node_dir})")
293        _cleanup_process_async(self.transport)
294        return self.transport.exited
295
296    def restart_async(self, reactor: IReactorProcess, request: Any) -> Deferred:
297        """
298        Stop and then re-start the associated process.
299
300        :return: A Deferred that fires after the new process is ready to
301            handle requests.
302        """
303        d = self.kill_async()
304        d.addCallback(lambda ignored: _run_node(reactor, self.node_dir, request, None))
305        def got_new_process(proc):
306            # Grab the new transport since the one we had before is no longer
307            # valid after the stop/start cycle.
308            self._process_transport = proc.transport
309        d.addCallback(got_new_process)
310        return d
311
312    def __str__(self):
313        return "<TahoeProcess in '{}'>".format(self._node_dir)
314
315
316def _run_node(reactor, node_dir, request, magic_text):
317    """
318    Run a tahoe process from its node_dir.
319
320    :returns: a TahoeProcess for this node
321    """
322    if magic_text is None:
323        magic_text = "client running"
324    protocol = _MagicTextProtocol(magic_text, basename(node_dir))
325
326    # "tahoe run" is consistent across Linux/macOS/Windows, unlike the old
327    # "start" command.
328    transport = _tahoe_runner_optional_coverage(
329        protocol,
330        reactor,
331        request,
332        [
333            '--eliot-destination', 'file:{}/logs/eliot.json'.format(node_dir),
334            'run',
335            node_dir,
336        ],
337    )
338    transport.exited = protocol.exited
339
340    tahoe_process = TahoeProcess(
341        transport,
342        node_dir,
343    )
344
345    request.addfinalizer(tahoe_process.kill)
346
347    d = protocol.magic_seen
348    d.addCallback(lambda ignored: tahoe_process)
349    return d
350
351
352def basic_node_configuration(request, flog_gatherer, node_dir: str):
353    """
354    Setup common configuration options for a node, given a ``pytest`` request
355    fixture.
356    """
357    config_path = join(node_dir, 'tahoe.cfg')
358    config = get_config(config_path)
359    set_config(
360        config,
361        u'node',
362        u'log_gatherer.furl',
363        flog_gatherer,
364    )
365    force_foolscap = request.config.getoption("force_foolscap")
366    assert force_foolscap in (True, False)
367    set_config(
368        config,
369        'storage',
370        'force_foolscap',
371        str(force_foolscap),
372    )
373    set_config(
374        config,
375        'client',
376        'force_foolscap',
377        str(force_foolscap),
378    )
379    write_config(FilePath(config_path), config)
380
381
382def _create_node(reactor, request, temp_dir, introducer_furl, flog_gatherer, name, web_port,
383                 storage=True,
384                 magic_text=None,
385                 needed=2,
386                 happy=3,
387                 total=4):
388    """
389    Helper to create a single node, run it and return the instance
390    spawnProcess returned (ITransport)
391    """
392    node_dir = join(temp_dir, name)
393    if web_port is None:
394        web_port = ''
395    if exists(node_dir):
396        created_d = succeed(None)
397    else:
398        print("creating: {}".format(node_dir))
399        mkdir(node_dir)
400        done_proto = _ProcessExitedProtocol()
401        args = [
402            'create-node',
403            '--nickname', name,
404            '--introducer', introducer_furl,
405            '--hostname', 'localhost',
406            '--listen', 'tcp',
407            '--webport', web_port,
408            '--shares-needed', str(needed),
409            '--shares-happy', str(happy),
410            '--shares-total', str(total),
411            '--helper',
412        ]
413        if not storage:
414            args.append('--no-storage')
415        args.append(node_dir)
416
417        _tahoe_runner_optional_coverage(done_proto, reactor, request, args)
418        created_d = done_proto.done
419
420        def created(_):
421            basic_node_configuration(request, flog_gatherer.furl, node_dir)
422        created_d.addCallback(created)
423
424    d = Deferred()
425    d.callback(None)
426    d.addCallback(lambda _: created_d)
427    d.addCallback(lambda _: _run_node(reactor, node_dir, request, magic_text))
428    return d
429
430
431class UnwantedFilesException(Exception):
432    """
433    While waiting for some files to appear, some undesired files
434    appeared instead (or in addition).
435    """
436    def __init__(self, waiting, unwanted):
437        super(UnwantedFilesException, self).__init__(
438            u"While waiting for '{}', unwanted files appeared: {}".format(
439                waiting,
440                u', '.join(unwanted),
441            )
442        )
443
444
445class ExpectedFileMismatchException(Exception):
446    """
447    A file or files we wanted weren't found within the timeout.
448    """
449    def __init__(self, path, timeout):
450        super(ExpectedFileMismatchException, self).__init__(
451            u"Contents of '{}' mismatched after {}s".format(path, timeout),
452        )
453
454
455class ExpectedFileUnfoundException(Exception):
456    """
457    A file or files we expected to find didn't appear within the
458    timeout.
459    """
460    def __init__(self, path, timeout):
461        super(ExpectedFileUnfoundException, self).__init__(
462            u"Didn't find '{}' after {}s".format(path, timeout),
463        )
464
465
466
467class FileShouldVanishException(Exception):
468    """
469    A file or files we expected to disappear did not within the
470    timeout
471    """
472    def __init__(self, path, timeout):
473        super(FileShouldVanishException, self).__init__(
474            u"'{}' still exists after {}s".format(path, timeout),
475        )
476
477
478def run_in_thread(f):
479    """Decorator for integration tests that runs code in a thread.
480
481    Because we're using pytest_twisted, tests that rely on the reactor are
482    expected to return a Deferred and use async APIs so the reactor can run.
483
484    In the case of the integration test suite, it launches nodes in the
485    background using Twisted APIs.  The nodes stdout and stderr is read via
486    Twisted code.  If the reactor doesn't run, reads don't happen, and
487    eventually the buffers fill up, and the nodes block when they try to flush
488    logs.
489
490    We can switch to Twisted APIs (treq instead of requests etc.), but
491    sometimes it's easier or expedient to just have a blocking test.  So this
492    decorator allows you to run the test in a thread, and the reactor can keep
493    running in the main thread.
494
495    See https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3597 for tracking bug.
496    """
497    @wraps(f)
498    def test(*args, **kwargs):
499        return deferToThread(lambda: f(*args, **kwargs))
500    return test
501
502
503def await_file_contents(path, contents, timeout=15, error_if=None):
504    """
505    wait up to `timeout` seconds for the file at `path` (any path-like
506    object) to have the exact content `contents`.
507
508    :param error_if: if specified, a list of additional paths; if any
509        of these paths appear an Exception is raised.
510    """
511    start_time = time.time()
512    while time.time() - start_time < timeout:
513        print("  waiting for '{}'".format(path))
514        if error_if and any([exists(p) for p in error_if]):
515            raise UnwantedFilesException(
516                waiting=path,
517                unwanted=[p for p in error_if if exists(p)],
518            )
519        if exists(path):
520            try:
521                with open(path, 'r') as f:
522                    current = f.read()
523            except IOError:
524                print("IOError; trying again")
525            else:
526                if current == contents:
527                    return True
528                print("  file contents still mismatched")
529                print("  wanted: {}".format(contents.replace('\n', ' ')))
530                print("     got: {}".format(current.replace('\n', ' ')))
531        time.sleep(1)
532    if exists(path):
533        raise ExpectedFileMismatchException(path, timeout)
534    raise ExpectedFileUnfoundException(path, timeout)
535
536
537def await_files_exist(paths, timeout=15, await_all=False):
538    """
539    wait up to `timeout` seconds for any of the paths to exist; when
540    any exist, a list of all found filenames is returned. Otherwise,
541    an Exception is raised
542    """
543    start_time = time.time()
544    while time.time() - start_time < timeout:
545        print("  waiting for: {}".format(' '.join(paths)))
546        found = [p for p in paths if exists(p)]
547        print("found: {}".format(found))
548        if await_all:
549            if len(found) == len(paths):
550                return found
551        else:
552            if len(found) > 0:
553                return found
554        time.sleep(1)
555    if await_all:
556        nice_paths = ' and '.join(paths)
557    else:
558        nice_paths = ' or '.join(paths)
559    raise ExpectedFileUnfoundException(nice_paths, timeout)
560
561
562def await_file_vanishes(path, timeout=10):
563    start_time = time.time()
564    while time.time() - start_time < timeout:
565        print("  waiting for '{}' to vanish".format(path))
566        if not exists(path):
567            return
568        time.sleep(1)
569    raise FileShouldVanishException(path, timeout)
570
571
572def cli(node, *argv):
573    """
574    Run a tahoe CLI subcommand for a given node in a blocking manner, returning
575    the output.
576    """
577    arguments = ["tahoe", '--node-directory', node.node_dir]
578    return check_output(arguments + list(argv))
579
580
581def node_url(node_dir, uri_fragment):
582    """
583    Create a fully qualified URL by reading config from `node_dir` and
584    adding the `uri_fragment`
585    """
586    with open(join(node_dir, "node.url"), "r") as f:
587        base = f.read().strip()
588    url = base + uri_fragment
589    return url
590
591
592def _check_status(response):
593    """
594    Check the response code is a 2xx (raise an exception otherwise)
595    """
596    if response.status_code < 200 or response.status_code >= 300:
597        raise ValueError(
598            "Expected a 2xx code, got {}".format(response.status_code)
599        )
600
601
602def web_get(tahoe, uri_fragment, **kwargs):
603    """
604    Make a GET request to the webport of `tahoe` (a `TahoeProcess`,
605    usually from a fixture (e.g. `alice`). This will look like:
606    `http://localhost:<webport>/<uri_fragment>`. All `kwargs` are
607    passed on to `requests.get`
608    """
609    url = node_url(tahoe.node_dir, uri_fragment)
610    resp = requests.get(url, **kwargs)
611    _check_status(resp)
612    return resp.content
613
614
615def web_post(tahoe, uri_fragment, **kwargs):
616    """
617    Make a POST request to the webport of `node` (a `TahoeProcess,
618    usually from a fixture e.g. `alice`). This will look like:
619    `http://localhost:<webport>/<uri_fragment>`. All `kwargs` are
620    passed on to `requests.post`
621    """
622    url = node_url(tahoe.node_dir, uri_fragment)
623    resp = requests.post(url, **kwargs)
624    _check_status(resp)
625    return resp.content
626
627
628@run_in_thread
629def await_client_ready(tahoe, timeout=10, liveness=60*2, minimum_number_of_servers=1):
630    """
631    Uses the status API to wait for a client-type node (in `tahoe`, a
632    `TahoeProcess` instance usually from a fixture e.g. `alice`) to be
633    'ready'. A client is deemed ready if:
634
635      - it answers `http://<node_url>/statistics/?t=json/`
636      - there is at least one storage-server connected (configurable via
637        ``minimum_number_of_servers``)
638      - every storage-server has a "last_received_data" and it is
639        within the last `liveness` seconds
640
641    We will try for up to `timeout` seconds for the above conditions
642    to be true. Otherwise, an exception is raised
643    """
644    start = time.time()
645    while (time.time() - start) < float(timeout):
646        try:
647            data = web_get(tahoe, u"", params={u"t": u"json"})
648            js = json.loads(data)
649        except Exception as e:
650            print("waiting because '{}'".format(e))
651            time.sleep(1)
652            continue
653        servers = js['servers']
654
655        if len(servers) < minimum_number_of_servers:
656            print(f"waiting because {servers} is fewer than required ({minimum_number_of_servers})")
657            time.sleep(1)
658            continue
659
660        now = time.time()
661        server_times = [
662            server['last_received_data']
663            for server
664            in servers
665            if server['last_received_data'] is not None
666        ]
667        print(
668            f"Now: {time.ctime(now)}\n"
669            f"Liveness required: {liveness}\n"
670            f"Server last-received-data: {[time.ctime(s) for s in server_times]}\n"
671            f"Server ages: {[now - s for s in server_times]}\n"
672        )
673
674        # check that all times are 'recent enough' (it's OK if _some_ servers
675        # are down, we just want to make sure a sufficient number are up)
676        alive = [t for t in server_times if now - t <= liveness]
677        if len(alive) < minimum_number_of_servers:
678            print(
679                f"waiting because we found {len(alive)} servers "
680                f"and want {minimum_number_of_servers}"
681            )
682            time.sleep(1)
683            continue
684
685        # we have a status with at least one server, and all servers
686        # have been contacted recently
687        return True
688    # we only fall out of the loop when we've timed out
689    raise RuntimeError(
690        "Waited {} seconds for {} to be 'ready' but it never was".format(
691            timeout,
692            tahoe,
693        )
694    )
695
696
697def generate_ssh_key(path):
698    """Create a new SSH private/public key pair."""
699    key = RSAKey.generate(2048)
700    key.write_private_key_file(path)
701    with open(path + ".pub", "wb") as f:
702        s = "%s %s" % (key.get_name(), key.get_base64())
703        f.write(s.encode("ascii"))
704
705
706@frozen
707class CHK:
708    """
709    Represent the CHK encoding sufficiently to run a ``tahoe put`` command
710    using it.
711    """
712    kind = "chk"
713    max_shares = 256
714
715    def customize(self) -> CHK:
716        # Nothing to do.
717        return self
718
719    @classmethod
720    def load(cls, params: None) -> CHK:
721        assert params is None
722        return cls()
723
724    def to_json(self) -> None:
725        return None
726
727    @contextmanager
728    def to_argv(self) -> None:
729        yield []
730
731@frozen
732class SSK:
733    """
734    Represent the SSK encodings (SDMF and MDMF) sufficiently to run a
735    ``tahoe put`` command using one of them.
736    """
737    kind = "ssk"
738
739    # SDMF and MDMF encode share counts (N and k) into the share itself as an
740    # unsigned byte.  They could have encoded (share count - 1) to fit the
741    # full range supported by ZFEC into the unsigned byte - but they don't.
742    # So 256 is inaccessible to those formats and we set the upper bound at
743    # 255.
744    max_shares = 255
745
746    name: Literal["sdmf", "mdmf"]
747    key: None | bytes
748
749    @classmethod
750    def load(cls, params: dict) -> SSK:
751        assert params.keys() == {"format", "mutable", "key"}
752        return cls(params["format"], params["key"].encode("ascii"))
753    def customize(self) -> SSK:
754        """
755        Return an SSK with a newly generated random RSA key.
756        """
757        return evolve(self, key=generate_rsa_key())
758
759    def to_json(self) -> dict[str, str]:
760        return {
761            "format": self.name,
762            "mutable": None,
763            "key": self.key.decode("ascii"),
764        }
765
766    @contextmanager
767    def to_argv(self) -> None:
768        with NamedTemporaryFile() as f:
769            f.write(self.key)
770            f.flush()
771            yield [f"--format={self.name}", "--mutable", f"--private-key-path={f.name}"]
772
773
774def upload(alice: TahoeProcess, fmt: CHK | SSK, data: bytes) -> str:
775    """
776    Upload the given data to the given node.
777
778    :param alice: The node to upload to.
779
780    :param fmt: The name of the format for the upload.  CHK, SDMF, or MDMF.
781
782    :param data: The data to upload.
783
784    :return: The capability for the uploaded data.
785    """
786
787    with NamedTemporaryFile() as f:
788        f.write(data)
789        f.flush()
790        with fmt.to_argv() as fmt_argv:
791            argv = [alice.process, "put"] + fmt_argv + [f.name]
792            return cli(*argv).decode("utf-8").strip()
793
794
795async def reconfigure(reactor, request, node: TahoeProcess,
796                      params: tuple[int, int, int],
797                      convergence: None | bytes,
798                      max_segment_size: None | int = None) -> None:
799    """
800    Reconfigure a Tahoe-LAFS node with different ZFEC parameters and
801    convergence secret.
802
803    TODO This appears to have issues on Windows.
804
805    If the current configuration is different from the specified
806    configuration, the node will be restarted so it takes effect.
807
808    :param reactor: A reactor to use to restart the process.
809    :param request: The pytest request object to use to arrange process
810        cleanup.
811    :param node: The Tahoe-LAFS node to reconfigure.
812    :param params: The ``happy``, ``needed``, and ``total`` ZFEC encoding
813      parameters.
814    :param convergence: If given, the convergence secret.  If not given, the
815        existing convergence secret will be left alone.
816
817    :return: ``None`` after the node configuration has been rewritten, the
818        node has been restarted, and the node is ready to provide service.
819    """
820    happy, needed, total = params
821    config = node.get_config()
822
823    changed = False
824    cur_happy = int(config.get_config("client", "shares.happy"))
825    cur_needed = int(config.get_config("client", "shares.needed"))
826    cur_total = int(config.get_config("client", "shares.total"))
827
828    if (happy, needed, total) != (cur_happy, cur_needed, cur_total):
829        changed = True
830        config.set_config("client", "shares.happy", str(happy))
831        config.set_config("client", "shares.needed", str(needed))
832        config.set_config("client", "shares.total", str(total))
833
834    if convergence is not None:
835        cur_convergence = config.get_private_config("convergence").encode("ascii")
836        if base32.a2b(cur_convergence) != convergence:
837            changed = True
838            config.write_private_config("convergence", base32.b2a(convergence))
839
840    if max_segment_size is not None:
841        cur_segment_size = int(config.get_config("client", "shares._max_immutable_segment_size_for_testing", DEFAULT_IMMUTABLE_MAX_SEGMENT_SIZE))
842        if cur_segment_size != max_segment_size:
843            changed = True
844            config.set_config(
845                "client",
846                "shares._max_immutable_segment_size_for_testing",
847                str(max_segment_size)
848            )
849
850    if changed:
851        # restart the node
852        print(f"Restarting {node.node_dir} for ZFEC reconfiguration")
853        await node.restart_async(reactor, request)
854        print("Restarted.  Waiting for ready state.")
855        await await_client_ready(node)
856        print("Ready.")
857    else:
858        print("Config unchanged, not restarting.")
859
860
861def generate_rsa_key() -> bytes:
862    """
863    Generate a 2048 bit RSA key suitable for use with SSKs.
864    """
865    return rsa.generate_private_key(
866        public_exponent=65537,
867        key_size=2048,
868        backend=default_backend()
869    ).private_bytes(
870        encoding=Encoding.PEM,
871        format=PrivateFormat.TraditionalOpenSSL,
872        encryption_algorithm=NoEncryption(),
873    )
Note: See TracBrowser for help on using the repository browser.