source: trunk/src/allmydata/test/cli/wormholetesting.py

Last change on this file was aa2bce4, checked in by Florian Sesser <florian@…>, at 2025-10-08T13:09:20Z

Pacify mypy

  • Property mode set to 100644
File size: 11.2 KB
Line 
1"""
2An in-memory implementation of some of the magic-wormhole interfaces for
3use by automated tests.
4
5For example::
6
7    async def peerA(mw):
8        wormhole = mw.create("myapp", "wss://myserver", reactor)
9        code = await wormhole.get_code()
10        print(f"I have a code: {code}")
11        message = await wormhole.when_received()
12        print(f"I have a message: {message}")
13
14    async def local_peerB(helper, mw):
15        peerA_wormhole = await helper.wait_for_wormhole("myapp", "wss://myserver")
16        code = await peerA_wormhole.when_code()
17
18        peerB_wormhole = mw.create("myapp", "wss://myserver")
19        peerB_wormhole.set_code(code)
20
21        peerB_wormhole.send_message("Hello, peer A")
22
23    # Run peerA against local_peerB with pure in-memory message passing.
24    server, helper = memory_server()
25    run(gather(peerA(server), local_peerB(helper, server)))
26
27    # Run peerA against a peerB somewhere out in the world, using a real
28    # wormhole relay server somewhere.
29    import wormhole
30    run(peerA(wormhole))
31"""
32
33from __future__ import annotations
34
35__all__ = ['MemoryWormholeServer', 'TestingHelper', 'memory_server', 'IWormhole']
36
37from typing import Iterator, Optional, List, Tuple, Any, TextIO, Callable
38import inspect
39from itertools import count
40from sys import stderr
41
42from attrs import frozen, define, field, Factory
43from twisted.internet.defer import Deferred, DeferredQueue, succeed
44from wormhole._interfaces import IWormhole
45from wormhole.wormhole import create
46from zope.interface import implementer
47
48WormholeCode = str
49WormholeMessage = bytes
50AppId = str
51RelayURL = str
52ApplicationKey = Tuple[RelayURL, AppId]
53
54@define
55class MemoryWormholeServer:
56    """
57    A factory for in-memory wormholes.
58
59    :ivar _apps: Wormhole state arranged by the application id and relay URL
60        it belongs to.
61
62    :ivar _waiters: Observers waiting for a wormhole to be created for a
63        specific application id and relay URL combination.
64    """
65    _apps: dict[ApplicationKey, _WormholeApp] = field(default=Factory(dict))
66    _waiters: dict[ApplicationKey, Deferred[IWormhole]] = field(default=Factory(dict))
67
68    def create(
69        self,
70        appid: str,
71        relay_url: str,
72        reactor: Any,
73        # Unfortunately we need a mutable default to match the real API
74        versions: Any={},  # noqa: B006
75        delegate: Optional[Any]=None,
76        journal: Optional[Any]=None,
77        tor: Optional[Any]=None,
78        timing: Optional[Any]=None,
79        stderr: TextIO=stderr,
80        _eventual_queue: Optional[Any]=None,
81        _enable_dilate: bool=False,
82        on_status_update: Optional[Callable[[Any], None]]=None,
83    ) -> _MemoryWormhole:
84        """
85        Create a wormhole.  It will be able to connect to other wormholes created
86        by this instance (and constrained by the normal appid/relay_url
87        rules).
88        """
89        if tor is not None:
90            raise ValueError("Cannot deal with Tor right now.")
91        if _enable_dilate:
92            raise ValueError("Cannot deal with dilation right now.")
93
94        key = (relay_url, appid)
95        wormhole = _MemoryWormhole(self._view(key))
96        if key in self._waiters:
97            self._waiters.pop(key).callback(wormhole)
98        return wormhole
99
100    def _view(self, key: ApplicationKey) -> _WormholeServerView:
101        """
102        Created a view onto this server's state that is limited by a certain
103        appid/relay_url pair.
104        """
105        return _WormholeServerView(self, key)
106
107
108@frozen
109class TestingHelper:
110    """
111    Provide extra functionality for interacting with an in-memory wormhole
112    implementation.
113
114    This is intentionally a separate API so that it is not confused with
115    proper public interface of the real wormhole implementation.
116    """
117    _server: MemoryWormholeServer
118
119    async def wait_for_wormhole(self, appid: AppId, relay_url: RelayURL) -> IWormhole:
120        """
121        Wait for a wormhole to appear at a specific location.
122
123        :param appid: The appid that the resulting wormhole will have.
124
125        :param relay_url: The URL of the relay at which the resulting wormhole
126            will presume to be created.
127
128        :return: The first wormhole to be created which matches the given
129            parameters.
130        """
131        key = (relay_url, appid)
132        if key in self._server._waiters:
133            raise ValueError(f"There is already a waiter for {key}")
134        d : Deferred[IWormhole] = Deferred()
135        self._server._waiters[key] = d
136        wormhole = await d
137        return wormhole
138
139
140def _verify() -> None:
141    """
142    Roughly confirm that the in-memory wormhole creation function matches the
143    interface of the real implementation.
144    """
145    # Poor man's interface verification.
146
147    a = inspect.getfullargspec(create)
148    b = inspect.getfullargspec(MemoryWormholeServer.create)
149    # I know it has a `self` argument at the beginning.  That's okay.
150    b = b._replace(args=b.args[1:])
151
152    # Just compare the same information to check function signature
153    # We might want to remove these - they are *very* specific.
154    # We don't require the in-memory ersatz to be *exactly* the same
155    # as the live MW.
156    assert a.varkw == b.varkw
157    assert a.varargs == b.varargs
158    assert a.kwonlydefaults == b.kwonlydefaults
159
160    # An earlier version of this test was very strict.  We want
161    # this test to pass with different versions of Magic Wormhole -
162    # with and without Dilation - which changes the MW API
163    # (MW before and after 0.19)
164
165    # The mock and the real interface shouldn't differ "too much".
166    # We later might want to relax this further by increasing
167    # the number of allowed different parameters.
168    assert(len(set(a.args) ^ set(b.args)) < 3)
169
170    # What we really want is the required interface (which is
171    # the part we use) to be the same:
172    def required_args(func: Callable[..., Any]) -> List[str]:
173        return [n for n, p in inspect.signature(func).parameters.items()
174           # An argument is required if it must be supplied and has no default.
175           # And we don't count 'self'.
176           if p.default is p.empty
177              and p.kind not in (p.VAR_POSITIONAL, p.VAR_KEYWORD)
178              and p.name != 'self']
179    assert (required_args(create) == required_args(MemoryWormholeServer.create))
180
181
182
183_verify()
184
185
186@define
187class _WormholeApp:
188    """
189    Represent a collection of wormholes that belong to the same
190    appid/relay_url scope.
191    """
192    wormholes: dict[WormholeCode, IWormhole] = field(default=Factory(dict))
193    _waiting: dict[WormholeCode, List[Deferred[_MemoryWormhole]]] = field(default=Factory(dict))
194    _counter: Iterator[int] = field(default=Factory(count))
195
196    def allocate_code(self, wormhole: IWormhole, code: Optional[WormholeCode]) -> WormholeCode:
197        """
198        Allocate a new code for the given wormhole.
199
200        This also associates the given wormhole with the code for future
201        lookup.
202
203        Code generation logic is trivial and certainly not good enough for any
204        real use.  It is sufficient for automated testing, though.
205        """
206        if code is None:
207            code = "{}-persnickety-tardigrade".format(next(self._counter))
208        self.wormholes.setdefault(code, []).append(wormhole)
209        try:
210            waiters = self._waiting.pop(code)
211        except KeyError:
212            pass
213        else:
214            for w in waiters:
215                w.callback(wormhole)
216
217        return code
218
219    def wait_for_wormhole(self, code: WormholeCode) -> Deferred[_MemoryWormhole]:
220        """
221        Return a ``Deferred`` which fires with the next wormhole to be associated
222        with the given code.  This is used to let the first end of a wormhole
223        rendezvous with the second end.
224        """
225        d : Deferred[_MemoryWormhole] = Deferred()
226        self._waiting.setdefault(code, []).append(d)
227        return d
228
229
230@frozen
231class _WormholeServerView:
232    """
233    Present an interface onto the server to be consumed by individual
234    wormholes.
235    """
236    _server: MemoryWormholeServer
237    _key: ApplicationKey
238
239    def allocate_code(self, wormhole: _MemoryWormhole, code: Optional[WormholeCode]) -> WormholeCode:
240        """
241        Allocate a new code for the given wormhole in the scope associated with
242        this view.
243        """
244        app = self._server._apps.setdefault(self._key, _WormholeApp())
245        return app.allocate_code(wormhole, code)
246
247    def wormhole_by_code(self, code: WormholeCode, exclude: object) -> Deferred[IWormhole]:
248        """
249        Retrieve all wormholes previously associated with a code.
250        """
251        app = self._server._apps[self._key]
252        wormholes = app.wormholes[code]
253        try:
254            [wormhole] = list(wormhole for wormhole in wormholes if wormhole != exclude)
255        except ValueError:
256            return app.wait_for_wormhole(code)
257        return succeed(wormhole)
258
259
260@implementer(IWormhole)
261@define
262class _MemoryWormhole:
263    """
264    Represent one side of a wormhole as conceived by ``MemoryWormholeServer``.
265    """
266
267    _view: _WormholeServerView
268    _code: Optional[WormholeCode] = None
269    _payload: DeferredQueue[WormholeMessage] = field(default=Factory(DeferredQueue))
270    _waiting_for_code: list[Deferred[WormholeCode]] = field(default=Factory(list))
271
272    def allocate_code(self) -> None:
273        if self._code is not None:
274            raise ValueError(
275                "allocate_code used with a wormhole which already has a code"
276            )
277        self._code = self._view.allocate_code(self, None)
278        waiters = self._waiting_for_code
279        self._waiting_for_code = []
280        for d in waiters:
281            d.callback(self._code)
282
283    def set_code(self, code: WormholeCode) -> None:
284        if self._code is None:
285            self._code = code
286            self._view.allocate_code(self, code)
287        else:
288            raise ValueError("set_code used with a wormhole which already has a code")
289
290    def when_code(self) -> Deferred[WormholeCode]:
291        if self._code is None:
292            d : Deferred[WormholeCode] = Deferred()
293            self._waiting_for_code.append(d)
294            return d
295        return succeed(self._code)
296
297    def get_welcome(self) -> Deferred[str]:
298        return succeed("welcome")
299
300    def send_message(self, payload: WormholeMessage) -> None:
301        self._payload.put(payload)
302
303    def when_received(self) -> Deferred[WormholeMessage]:
304        if self._code is None:
305            raise ValueError(
306                "This implementation requires set_code or allocate_code "
307                "before when_received."
308            )
309        d = self._view.wormhole_by_code(self._code, exclude=self)
310
311        def got_wormhole(wormhole: _MemoryWormhole) -> Deferred[WormholeMessage]:
312            msg: Deferred[WormholeMessage] = wormhole._payload.get()
313            return msg
314
315        d.addCallback(got_wormhole)
316        return d
317
318    get_message = when_received
319
320    def close(self) -> None:
321        pass
322
323    # 0.9.2 compatibility
324    def get_code(self) -> Deferred[WormholeCode]:
325        if self._code is None:
326            self.allocate_code()
327        return self.when_code()
328
329    get = when_received
330
331
332def memory_server() -> tuple[MemoryWormholeServer, TestingHelper]:
333    """
334    Create a paired in-memory wormhole server and testing helper.
335    """
336    server = MemoryWormholeServer()
337    return server, TestingHelper(server)
Note: See TracBrowser for help on using the repository browser.