source: trunk/src/allmydata/test/common_util.py

Last change on this file was f19bf8cf, checked in by Jean-Paul Calderone <exarkun@…>, at 2022-04-11T19:04:55Z

Parameterize the options object to the run_cli helper

  • Property mode set to 100644
File size: 15.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import print_function
5from __future__ import absolute_import
6from __future__ import division
7from __future__ import unicode_literals
8
9from future.utils import PY2, PY3, bchr, binary_type
10from future.builtins import str as future_str
11if PY2:
12    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, dict, list, object, range, str, max, min  # noqa: F401
13
14import os
15import sys
16import time
17import signal
18from functools import (
19    partial,
20)
21from random import randrange
22if PY2:
23    from StringIO import StringIO
24from io import (
25    TextIOWrapper,
26    BytesIO,
27)
28
29from twisted.internet import reactor, defer
30from twisted.python import failure
31from twisted.trial import unittest
32
33from ..util.assertutil import precondition
34from ..scripts import runner
35from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding, argv_type, unicode_to_argv
36
37
38def skip_if_cannot_represent_filename(u):
39    precondition(isinstance(u, str))
40
41    enc = get_filesystem_encoding()
42    if not unicode_platform():
43        try:
44            u.encode(enc)
45        except UnicodeEncodeError:
46            raise unittest.SkipTest("A non-ASCII filename could not be encoded on this platform.")
47
48
49def _getvalue(io):
50    """
51    Read out the complete contents of a file-like object.
52    """
53    io.seek(0)
54    return io.read()
55
56
57def maybe_unicode_to_argv(o):
58    """Convert object to argv form if necessary."""
59    if isinstance(o, str):
60        return unicode_to_argv(o)
61    return o
62
63
64def run_cli_native(verb, *args, **kwargs):
65    """
66    Run a Tahoe-LAFS CLI command specified as bytes (on Python 2) or Unicode
67    (on Python 3); basically, it accepts a native string.
68
69    Most code should prefer ``run_cli_unicode`` which deals with all the
70    necessary encoding considerations.
71
72    :param runner.Options options: The options instance to use to parse the
73        given arguments.
74
75    :param native_str verb: The command to run.  For example,
76        ``"create-node"``.
77
78    :param [native_str] args: The arguments to pass to the command.  For
79        example, ``("--hostname=localhost",)``.
80
81    :param [native_str] nodeargs: Extra arguments to pass to the Tahoe
82        executable before ``verb``.
83
84    :param bytes|unicode stdin: Text or bytes to pass to the command via stdin.
85
86    :param NoneType|str encoding: The name of an encoding which stdout and
87        stderr will be configured to use.  ``None`` means matching default
88        behavior for the given Python version.
89
90    :param bool return_bytes: If False, stdout/stderr is native string,
91        matching native behavior.  If True, stdout/stderr are returned as
92        bytes.
93    """
94    options = kwargs.pop("options", runner.Options())
95    nodeargs = kwargs.pop("nodeargs", [])
96    encoding = kwargs.pop("encoding", None) or getattr(sys.stdout, "encoding") or "utf-8"
97    return_bytes = kwargs.pop("return_bytes", False)
98    verb = maybe_unicode_to_argv(verb)
99    args = [maybe_unicode_to_argv(a) for a in args]
100    nodeargs = [maybe_unicode_to_argv(a) for a in nodeargs]
101    precondition(
102        all(isinstance(arg, argv_type) for arg in [verb] + nodeargs + list(args)),
103        "arguments to run_cli must be {argv_type} -- convert using unicode_to_argv".format(argv_type=argv_type),
104        verb=verb,
105        args=args,
106        nodeargs=nodeargs,
107    )
108    argv = ["tahoe"] + nodeargs + [verb] + list(args)
109    stdin = kwargs.get("stdin", "")
110    if PY2:
111        # The original behavior, the Python 2 behavior, is to accept either
112        # bytes or unicode and try to automatically encode or decode as
113        # necessary.  This works okay for ASCII and if LANG is set
114        # appropriately.  These aren't great constraints so we should move
115        # away from this behavior.
116        #
117        # The encoding attribute doesn't change StringIO behavior on Python 2,
118        # but it's there for realism of the emulation.
119        stdin = StringIO(stdin)
120        stdin.encoding = encoding
121        stdout = StringIO()
122        stdout.encoding = encoding
123        stderr = StringIO()
124        stderr.encoding = encoding
125    else:
126        # The new behavior, the Python 3 behavior, is to accept unicode and
127        # encode it using a specific encoding. For older versions of Python 3,
128        # the encoding is determined from LANG (bad) but for newer Python 3,
129        # the encoding is either LANG if it supports full Unicode, otherwise
130        # utf-8 (good). Tests can pass in different encodings to exercise
131        # different behaviors.
132        if isinstance(stdin, str):
133            stdin = stdin.encode(encoding)
134        stdin = TextIOWrapper(BytesIO(stdin), encoding)
135        stdout = TextIOWrapper(BytesIO(), encoding)
136        stderr = TextIOWrapper(BytesIO(), encoding)
137    d = defer.succeed(argv)
138    d.addCallback(
139        partial(
140            runner.parse_or_exit,
141            options,
142        ),
143        stdout=stdout,
144        stderr=stderr,
145    )
146    d.addCallback(
147        runner.dispatch,
148        stdin=stdin,
149        stdout=stdout,
150        stderr=stderr,
151    )
152    def _done(rc, stdout=stdout, stderr=stderr):
153        if return_bytes and PY3:
154            stdout = stdout.buffer
155            stderr = stderr.buffer
156        return 0, _getvalue(stdout), _getvalue(stderr)
157    def _err(f, stdout=stdout, stderr=stderr):
158        f.trap(SystemExit)
159        if return_bytes and PY3:
160            stdout = stdout.buffer
161            stderr = stderr.buffer
162        return f.value.code, _getvalue(stdout), _getvalue(stderr)
163    d.addCallbacks(_done, _err)
164    return d
165
166
167def run_cli_unicode(verb, argv, nodeargs=None, stdin=None, encoding=None):
168    """
169    Run a Tahoe-LAFS CLI command.
170
171    :param unicode verb: The command to run.  For example, ``u"create-node"``.
172
173    :param [unicode] argv: The arguments to pass to the command.  For example,
174        ``[u"--hostname=localhost"]``.
175
176    :param [unicode] nodeargs: Extra arguments to pass to the Tahoe executable
177        before ``verb``.
178
179    :param unicode stdin: Text to pass to the command via stdin.
180
181    :param NoneType|str encoding: The name of an encoding to use for all
182        bytes/unicode conversions necessary *and* the encoding to cause stdio
183        to declare with its ``encoding`` attribute.  ``None`` means ASCII will
184        be used and no declaration will be made at all.
185    """
186    if nodeargs is None:
187        nodeargs = []
188    precondition(
189        all(isinstance(arg, future_str) for arg in [verb] + nodeargs + argv),
190        "arguments to run_cli_unicode must be unicode",
191        verb=verb,
192        nodeargs=nodeargs,
193        argv=argv,
194    )
195    codec = encoding or "ascii"
196    if PY2:
197        encode = lambda t: None if t is None else t.encode(codec)
198    else:
199        # On Python 3 command-line parsing expects Unicode!
200        encode = lambda t: t
201    d = run_cli_native(
202        encode(verb),
203        nodeargs=list(encode(arg) for arg in nodeargs),
204        stdin=encode(stdin),
205        encoding=encoding,
206        *list(encode(arg) for arg in argv)
207    )
208    def maybe_decode(result):
209        code, stdout, stderr = result
210        if isinstance(stdout, bytes):
211            stdout = stdout.decode(codec)
212        if isinstance(stderr, bytes):
213            stderr = stderr.decode(codec)
214        return code, stdout, stderr
215    d.addCallback(maybe_decode)
216    return d
217
218
219run_cli = run_cli_native
220
221
222def parse_cli(*argv):
223    # This parses the CLI options (synchronously), and returns the Options
224    # argument, or throws usage.UsageError if something went wrong.
225    return runner.parse_options(argv)
226
227class DevNullDictionary(dict):
228    def __setitem__(self, key, value):
229        return
230
231def insecurerandstr(n):
232    return b''.join(map(bchr, list(map(randrange, [0]*n, [256]*n))))
233
234def flip_bit(good, which):
235    """Flip the low-order bit of good[which]."""
236    if which == -1:
237        pieces = good[:which], good[-1:], b""
238    else:
239        pieces = good[:which], good[which:which+1], good[which+1:]
240    return pieces[0] + bchr(ord(pieces[1]) ^ 0x01) + pieces[2]
241
242def flip_one_bit(s, offset=0, size=None):
243    """ flip one random bit of the string s, in a byte greater than or equal to offset and less
244    than offset+size. """
245    precondition(isinstance(s, binary_type))
246    if size is None:
247        size=len(s)-offset
248    i = randrange(offset, offset+size)
249    result = s[:i] + bchr(ord(s[i:i+1])^(0x01<<randrange(0, 8))) + s[i+1:]
250    assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
251    return result
252
253
254class ReallyEqualMixin(object):
255    def failUnlessReallyEqual(self, a, b, msg=None):
256        self.assertEqual(a, b, msg)
257        # Make sure unicode strings are a consistent type. Specifically there's
258        # Future newstr (backported Unicode type) vs. Python 2 native unicode
259        # type. They're equal, and _logically_ the same type, but have
260        # different types in practice.
261        if a.__class__ == future_str:
262            a = str(a)
263        if b.__class__ == future_str:
264            b = str(b)
265        self.assertEqual(type(a), type(b), "a :: %r (%s), b :: %r (%s), %r" % (a, type(a), b, type(b), msg))
266
267
268class SignalMixin(object):
269    # This class is necessary for any code which wants to use Processes
270    # outside the usual reactor.run() environment. It is copied from
271    # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
272    # something rather different.
273    sigchldHandler = None
274
275    def setUp(self):
276        # make sure SIGCHLD handler is installed, as it should be on
277        # reactor.run(). problem is reactor may not have been run when this
278        # test runs.
279        if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
280            self.sigchldHandler = signal.signal(signal.SIGCHLD,
281                                                reactor._handleSigchld)
282        return super(SignalMixin, self).setUp()
283
284    def tearDown(self):
285        if self.sigchldHandler:
286            signal.signal(signal.SIGCHLD, self.sigchldHandler)
287        return super(SignalMixin, self).tearDown()
288
289
290class StallMixin(object):
291    def stall(self, res=None, delay=1):
292        d = defer.Deferred()
293        reactor.callLater(delay, d.callback, res)
294        return d
295
296
297class Marker(object):
298    pass
299
300class FakeCanary(object):
301    """For use in storage tests.
302    """
303    def __init__(self, ignore_disconnectors=False):
304        self.ignore = ignore_disconnectors
305        self.disconnectors = {}
306    def notifyOnDisconnect(self, f, *args, **kwargs):
307        if self.ignore:
308            return
309        m = Marker()
310        self.disconnectors[m] = (f, args, kwargs)
311        return m
312    def dontNotifyOnDisconnect(self, marker):
313        if self.ignore:
314            return
315        del self.disconnectors[marker]
316    def getRemoteTubID(self):
317        return None
318    def getPeer(self):
319        return "<fake>"
320
321    def disconnected(self):
322        """Disconnect the canary, to be called by test code.
323
324        Can only  happen once.
325        """
326        if self.disconnectors is not None:
327            for (f, args, kwargs) in list(self.disconnectors.values()):
328                f(*args, **kwargs)
329            self.disconnectors = None
330
331
332class ShouldFailMixin(object):
333
334    def shouldFail(self, expected_failure, which, substring,
335                   callable, *args, **kwargs):
336        """Assert that a function call raises some exception. This is a
337        Deferred-friendly version of TestCase.assertRaises() .
338
339        Suppose you want to verify the following function:
340
341         def broken(a, b, c):
342             if a < 0:
343                 raise TypeError('a must not be negative')
344             return defer.succeed(b+c)
345
346        You can use:
347            d = self.shouldFail(TypeError, 'test name',
348                                'a must not be negative',
349                                broken, -4, 5, c=12)
350        in your test method. The 'test name' string will be included in the
351        error message, if any, because Deferred chains frequently make it
352        difficult to tell which assertion was tripped.
353
354        The substring= argument, if not None, must appear in the 'repr'
355        of the message wrapped by this Failure, or the test will fail.
356        """
357
358        assert substring is None or isinstance(substring, (bytes, str))
359        d = defer.maybeDeferred(callable, *args, **kwargs)
360        def done(res):
361            if isinstance(res, failure.Failure):
362                res.trap(expected_failure)
363                if substring:
364                    self.failUnless(substring in str(res),
365                                    "%s: substring '%s' not in '%s'"
366                                    % (which, substring, str(res)))
367                # return the Failure for further analysis, but in a form that
368                # doesn't make the Deferred chain think that we failed.
369                return [res]
370            else:
371                self.fail("%s was supposed to raise %s, not get '%s'" %
372                          (which, expected_failure, res))
373        d.addBoth(done)
374        return d
375
376
377class TestMixin(SignalMixin):
378    def setUp(self):
379        return super(TestMixin, self).setUp()
380
381    def tearDown(self):
382        self.clean_pending(required_to_quiesce=True)
383        return super(TestMixin, self).tearDown()
384
385    def clean_pending(self, dummy=None, required_to_quiesce=True):
386        """
387        This handy method cleans all pending tasks from the reactor.
388
389        When writing a unit test, consider the following question:
390
391            Is the code that you are testing required to release control once it
392            has done its job, so that it is impossible for it to later come around
393            (with a delayed reactor task) and do anything further?
394
395        If so, then trial will usefully test that for you -- if the code under
396        test leaves any pending tasks on the reactor then trial will fail it.
397
398        On the other hand, some code is *not* required to release control -- some
399        code is allowed to continuously maintain control by rescheduling reactor
400        tasks in order to do ongoing work.  Trial will incorrectly require that
401        code to clean up all its tasks from the reactor.
402
403        Most people think that such code should be amended to have an optional
404        "shutdown" operation that releases all control, but on the contrary it is
405        good design for some code to *not* have a shutdown operation, but instead
406        to have a "crash-only" design in which it recovers from crash on startup.
407
408        If the code under test is of the "long-running" kind, which is *not*
409        required to shutdown cleanly in order to pass tests, then you can simply
410        call testutil.clean_pending() at the end of the unit test, and trial will
411        be satisfied.
412        """
413        pending = reactor.getDelayedCalls()
414        active = bool(pending)
415        for p in pending:
416            if p.active():
417                p.cancel()
418            else:
419                print("WEIRDNESS! pending timed call not active!")
420        if required_to_quiesce and active:
421            self.fail("Reactor was still active when it was required to be quiescent.")
422
423
424class TimezoneMixin(object):
425
426    def setTimezone(self, timezone):
427        def tzset_if_possible():
428            # Windows doesn't have time.tzset().
429            if hasattr(time, 'tzset'):
430                time.tzset()
431
432        unset = object()
433        originalTimezone = os.environ.get('TZ', unset)
434        def restoreTimezone():
435            if originalTimezone is unset:
436                del os.environ['TZ']
437            else:
438                os.environ['TZ'] = originalTimezone
439            tzset_if_possible()
440
441        os.environ['TZ'] = timezone
442        self.addCleanup(restoreTimezone)
443        tzset_if_possible()
444
445    def have_working_tzset(self):
446        return hasattr(time, 'tzset')
447
448
449__all__ = [
450    "TestMixin", "ShouldFailMixin", "StallMixin", "run_cli", "parse_cli",
451    "DevNullDictionary", "insecurerandstr", "flip_bit", "flip_one_bit",
452    "SignalMixin", "skip_if_cannot_represent_filename", "ReallyEqualMixin"
453]
Note: See TracBrowser for help on using the repository browser.