source: trunk/src/allmydata/test/common_util.py

Last change on this file was a9015cd, checked in by Itamar Turner-Trauring <itamar@…>, at 2024-03-01T18:06:32Z

Remove another future import.

  • Property mode set to 100644
File size: 14.3 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os
6import sys
7import time
8import signal
9from functools import (
10    partial,
11)
12from random import randrange
13from io import (
14    TextIOWrapper,
15    BytesIO,
16)
17
18from twisted.internet import reactor, defer
19from twisted.python import failure
20from twisted.trial import unittest
21
22from ..util.assertutil import precondition
23from ..scripts import runner
24from allmydata.util.encodingutil import unicode_platform, get_filesystem_encoding, argv_type, unicode_to_argv
25
26def bchr(s):
27    return bytes([s])
28
29
30def skip_if_cannot_represent_filename(u):
31    precondition(isinstance(u, str))
32
33    enc = get_filesystem_encoding()
34    if not unicode_platform():
35        try:
36            u.encode(enc)
37        except UnicodeEncodeError:
38            raise unittest.SkipTest("A non-ASCII filename could not be encoded on this platform.")
39
40
41def _getvalue(io):
42    """
43    Read out the complete contents of a file-like object.
44    """
45    io.seek(0)
46    return io.read()
47
48
49def maybe_unicode_to_argv(o):
50    """Convert object to argv form if necessary."""
51    if isinstance(o, str):
52        return unicode_to_argv(o)
53    return o
54
55
56def run_cli_native(verb, *args, **kwargs):
57    """
58    Run a Tahoe-LAFS CLI command specified as bytes (on Python 2) or Unicode
59    (on Python 3); basically, it accepts a native string.
60
61    Most code should prefer ``run_cli_unicode`` which deals with all the
62    necessary encoding considerations.
63
64    :param runner.Options options: The options instance to use to parse the
65        given arguments.
66
67    :param str verb: The command to run.  For example,
68        ``"create-node"``.
69
70    :param [str] args: The arguments to pass to the command.  For
71        example, ``("--hostname=localhost",)``.
72
73    :param [str] nodeargs: Extra arguments to pass to the Tahoe
74        executable before ``verb``.
75
76    :param bytes|unicode stdin: Text or bytes to pass to the command via stdin.
77
78    :param NoneType|str encoding: The name of an encoding which stdout and
79        stderr will be configured to use.  ``None`` means matching default
80        behavior for the given Python version.
81
82    :param bool return_bytes: If False, stdout/stderr is native string,
83        matching native behavior.  If True, stdout/stderr are returned as
84        bytes.
85    """
86    options = kwargs.pop("options", runner.Options())
87    nodeargs = kwargs.pop("nodeargs", [])
88    encoding = kwargs.pop("encoding", None) or getattr(sys.stdout, "encoding") or "utf-8"
89    return_bytes = kwargs.pop("return_bytes", False)
90    verb = maybe_unicode_to_argv(verb)
91    args = [maybe_unicode_to_argv(a) for a in args]
92    nodeargs = [maybe_unicode_to_argv(a) for a in nodeargs]
93    precondition(
94        all(isinstance(arg, argv_type) for arg in [verb] + nodeargs + list(args)),
95        "arguments to run_cli must be {argv_type} -- convert using unicode_to_argv".format(argv_type=argv_type),
96        verb=verb,
97        args=args,
98        nodeargs=nodeargs,
99    )
100    argv = ["tahoe"] + nodeargs + [verb] + list(args)
101    stdin = kwargs.get("stdin", "")
102    if True:
103        # The new behavior, the Python 3 behavior, is to accept unicode and
104        # encode it using a specific encoding. For older versions of Python 3,
105        # the encoding is determined from LANG (bad) but for newer Python 3,
106        # the encoding is either LANG if it supports full Unicode, otherwise
107        # utf-8 (good). Tests can pass in different encodings to exercise
108        # different behaviors.
109        if isinstance(stdin, str):
110            stdin = stdin.encode(encoding)
111        stdin = TextIOWrapper(BytesIO(stdin), encoding)
112        stdout = TextIOWrapper(BytesIO(), encoding)
113        stderr = TextIOWrapper(BytesIO(), encoding)
114    options.stdin = stdin
115    d = defer.succeed(argv)
116    d.addCallback(
117        partial(
118            runner.parse_or_exit,
119            options,
120        ),
121        stdout=stdout,
122        stderr=stderr,
123    )
124    d.addCallback(
125        runner.dispatch,
126        reactor,
127        stdin=stdin,
128        stdout=stdout,
129        stderr=stderr,
130    )
131    def _done(rc, stdout=stdout, stderr=stderr):
132        if return_bytes:
133            stdout = stdout.buffer
134            stderr = stderr.buffer
135        return 0, _getvalue(stdout), _getvalue(stderr)
136    def _err(f, stdout=stdout, stderr=stderr):
137        f.trap(SystemExit)
138        if return_bytes:
139            stdout = stdout.buffer
140            stderr = stderr.buffer
141        return f.value.code, _getvalue(stdout), _getvalue(stderr)
142    d.addCallbacks(_done, _err)
143    return d
144
145
146def run_cli_unicode(verb, argv, nodeargs=None, stdin=None, encoding=None):
147    """
148    Run a Tahoe-LAFS CLI command.
149
150    :param unicode verb: The command to run.  For example, ``u"create-node"``.
151
152    :param [unicode] argv: The arguments to pass to the command.  For example,
153        ``[u"--hostname=localhost"]``.
154
155    :param [unicode] nodeargs: Extra arguments to pass to the Tahoe executable
156        before ``verb``.
157
158    :param unicode stdin: Text to pass to the command via stdin.
159
160    :param NoneType|str encoding: The name of an encoding to use for all
161        bytes/unicode conversions necessary *and* the encoding to cause stdio
162        to declare with its ``encoding`` attribute.  ``None`` means ASCII will
163        be used and no declaration will be made at all.
164    """
165    if nodeargs is None:
166        nodeargs = []
167    precondition(
168        all(isinstance(arg, str) for arg in [verb] + nodeargs + argv),
169        "arguments to run_cli_unicode must be unicode",
170        verb=verb,
171        nodeargs=nodeargs,
172        argv=argv,
173    )
174    codec = encoding or "ascii"
175    encode = lambda t: t
176    d = run_cli_native(
177        encode(verb),
178        nodeargs=list(encode(arg) for arg in nodeargs),
179        stdin=encode(stdin),
180        encoding=encoding,
181        *list(encode(arg) for arg in argv)
182    )
183    def maybe_decode(result):
184        code, stdout, stderr = result
185        if isinstance(stdout, bytes):
186            stdout = stdout.decode(codec)
187        if isinstance(stderr, bytes):
188            stderr = stderr.decode(codec)
189        return code, stdout, stderr
190    d.addCallback(maybe_decode)
191    return d
192
193
194run_cli = run_cli_native
195
196
197def parse_cli(*argv):
198    # This parses the CLI options (synchronously), and returns the Options
199    # argument, or throws usage.UsageError if something went wrong.
200    return runner.parse_options(argv)
201
202class DevNullDictionary(dict):
203    def __setitem__(self, key, value):
204        return
205
206def insecurerandstr(n):
207    return b''.join(map(bchr, list(map(randrange, [0]*n, [256]*n))))
208
209def flip_bit(good, which):
210    """Flip the low-order bit of good[which]."""
211    if which == -1:
212        pieces = good[:which], good[-1:], b""
213    else:
214        pieces = good[:which], good[which:which+1], good[which+1:]
215    return pieces[0] + bchr(ord(pieces[1]) ^ 0x01) + pieces[2]
216
217def flip_one_bit(s, offset=0, size=None):
218    """ flip one random bit of the string s, in a byte greater than or equal to offset and less
219    than offset+size. """
220    precondition(isinstance(s, bytes))
221    if size is None:
222        size=len(s)-offset
223    i = randrange(offset, offset+size)
224    result = s[:i] + bchr(ord(s[i:i+1])^(0x01<<randrange(0, 8))) + s[i+1:]
225    assert result != s, "Internal error -- flip_one_bit() produced the same string as its input: %s == %s" % (result, s)
226    return result
227
228
229class ReallyEqualMixin(object):
230    def failUnlessReallyEqual(self, a, b, msg=None):
231        self.assertEqual(a, b, msg)
232        if a.__class__ == str:
233            a = str(a)
234        if b.__class__ == str:
235            b = str(b)
236        self.assertEqual(type(a), type(b), "a :: %r (%s), b :: %r (%s), %r" % (a, type(a), b, type(b), msg))
237
238
239class SignalMixin(object):
240    # This class is necessary for any code which wants to use Processes
241    # outside the usual reactor.run() environment. It is copied from
242    # Twisted's twisted.test.test_process . Note that Twisted-8.2.0 uses
243    # something rather different.
244    sigchldHandler = None
245
246    def setUp(self):
247        # make sure SIGCHLD handler is installed, as it should be on
248        # reactor.run(). problem is reactor may not have been run when this
249        # test runs.
250        if hasattr(reactor, "_handleSigchld") and hasattr(signal, "SIGCHLD"):
251            self.sigchldHandler = signal.signal(signal.SIGCHLD,
252                                                reactor._handleSigchld)
253        return super(SignalMixin, self).setUp()
254
255    def tearDown(self):
256        if self.sigchldHandler:
257            signal.signal(signal.SIGCHLD, self.sigchldHandler)
258        return super(SignalMixin, self).tearDown()
259
260
261class StallMixin(object):
262    def stall(self, res=None, delay=1):
263        d = defer.Deferred()
264        reactor.callLater(delay, d.callback, res)
265        return d
266
267
268class Marker(object):
269    pass
270
271class FakeCanary(object):
272    """For use in storage tests.
273    """
274    def __init__(self, ignore_disconnectors=False):
275        self.ignore = ignore_disconnectors
276        self.disconnectors = {}
277    def notifyOnDisconnect(self, f, *args, **kwargs):
278        if self.ignore:
279            return
280        m = Marker()
281        self.disconnectors[m] = (f, args, kwargs)
282        return m
283    def dontNotifyOnDisconnect(self, marker):
284        if self.ignore:
285            return
286        del self.disconnectors[marker]
287    def getRemoteTubID(self):
288        return None
289    def getPeer(self):
290        return "<fake>"
291
292    def disconnected(self):
293        """Disconnect the canary, to be called by test code.
294
295        Can only  happen once.
296        """
297        if self.disconnectors is not None:
298            for (f, args, kwargs) in list(self.disconnectors.values()):
299                f(*args, **kwargs)
300            self.disconnectors = None
301
302
303class ShouldFailMixin(object):
304
305    def shouldFail(self, expected_failure, which, substring,
306                   callable, *args, **kwargs):
307        """Assert that a function call raises some exception. This is a
308        Deferred-friendly version of TestCase.assertRaises() .
309
310        Suppose you want to verify the following function:
311
312         def broken(a, b, c):
313             if a < 0:
314                 raise TypeError('a must not be negative')
315             return defer.succeed(b+c)
316
317        You can use:
318            d = self.shouldFail(TypeError, 'test name',
319                                'a must not be negative',
320                                broken, -4, 5, c=12)
321        in your test method. The 'test name' string will be included in the
322        error message, if any, because Deferred chains frequently make it
323        difficult to tell which assertion was tripped.
324
325        The substring= argument, if not None, must appear in the 'repr'
326        of the message wrapped by this Failure, or the test will fail.
327        """
328
329        assert substring is None or isinstance(substring, (bytes, str))
330        d = defer.maybeDeferred(callable, *args, **kwargs)
331        def done(res):
332            if isinstance(res, failure.Failure):
333                res.trap(expected_failure)
334                if substring:
335                    self.failUnless(substring in str(res),
336                                    "%s: substring '%s' not in '%s'"
337                                    % (which, substring, str(res)))
338                # return the Failure for further analysis, but in a form that
339                # doesn't make the Deferred chain think that we failed.
340                return [res]
341            else:
342                self.fail("%s was supposed to raise %s, not get '%s'" %
343                          (which, expected_failure, res))
344        d.addBoth(done)
345        return d
346
347
348class TestMixin(SignalMixin):
349    def setUp(self):
350        return super(TestMixin, self).setUp()
351
352    def tearDown(self):
353        self.clean_pending(required_to_quiesce=True)
354        return super(TestMixin, self).tearDown()
355
356    def clean_pending(self, dummy=None, required_to_quiesce=True):
357        """
358        This handy method cleans all pending tasks from the reactor.
359
360        When writing a unit test, consider the following question:
361
362            Is the code that you are testing required to release control once it
363            has done its job, so that it is impossible for it to later come around
364            (with a delayed reactor task) and do anything further?
365
366        If so, then trial will usefully test that for you -- if the code under
367        test leaves any pending tasks on the reactor then trial will fail it.
368
369        On the other hand, some code is *not* required to release control -- some
370        code is allowed to continuously maintain control by rescheduling reactor
371        tasks in order to do ongoing work.  Trial will incorrectly require that
372        code to clean up all its tasks from the reactor.
373
374        Most people think that such code should be amended to have an optional
375        "shutdown" operation that releases all control, but on the contrary it is
376        good design for some code to *not* have a shutdown operation, but instead
377        to have a "crash-only" design in which it recovers from crash on startup.
378
379        If the code under test is of the "long-running" kind, which is *not*
380        required to shutdown cleanly in order to pass tests, then you can simply
381        call testutil.clean_pending() at the end of the unit test, and trial will
382        be satisfied.
383        """
384        pending = reactor.getDelayedCalls()
385        active = bool(pending)
386        for p in pending:
387            if p.active():
388                p.cancel()
389            else:
390                print("WEIRDNESS! pending timed call not active!")
391        if required_to_quiesce and active:
392            self.fail("Reactor was still active when it was required to be quiescent.")
393
394
395class TimezoneMixin(object):
396
397    def setTimezone(self, timezone):
398        def tzset_if_possible():
399            # Windows doesn't have time.tzset().
400            if hasattr(time, 'tzset'):
401                time.tzset()
402
403        unset = object()
404        originalTimezone = os.environ.get('TZ', unset)
405        def restoreTimezone():
406            if originalTimezone is unset:
407                del os.environ['TZ']
408            else:
409                os.environ['TZ'] = originalTimezone
410            tzset_if_possible()
411
412        os.environ['TZ'] = timezone
413        self.addCleanup(restoreTimezone)
414        tzset_if_possible()
415
416    def have_working_tzset(self):
417        return hasattr(time, 'tzset')
418
419
420__all__ = [
421    "TestMixin", "ShouldFailMixin", "StallMixin", "run_cli", "parse_cli",
422    "DevNullDictionary", "insecurerandstr", "flip_bit", "flip_one_bit",
423    "SignalMixin", "skip_if_cannot_represent_filename", "ReallyEqualMixin"
424]
Note: See TracBrowser for help on using the repository browser.