source: trunk/integration/test_streaming_logs.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 3.5 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5from six import ensure_text
6
7import json
8
9from os.path import (
10    join,
11)
12from urllib.parse import (
13    urlsplit,
14)
15
16import attr
17
18from twisted.internet.defer import (
19    Deferred,
20)
21from twisted.internet.endpoints import (
22    HostnameEndpoint,
23)
24
25import treq
26
27from autobahn.twisted.websocket import (
28    WebSocketClientFactory,
29    WebSocketClientProtocol,
30)
31
32from allmydata.client import (
33    read_config,
34)
35from allmydata.web.private import (
36    SCHEME,
37)
38from allmydata.util.eliotutil import (
39    inline_callbacks,
40)
41
42import pytest_twisted
43
44def _url_to_endpoint(reactor, url):
45    netloc = urlsplit(url).netloc
46    host, port = netloc.split(":")
47    return HostnameEndpoint(reactor, host, int(port))
48
49
50class _StreamingLogClientProtocol(WebSocketClientProtocol):
51    def onOpen(self):
52        self.factory.on_open.callback(self)
53
54    def onMessage(self, payload, isBinary):
55        if self.on_message is None:
56            # Already did our job, ignore it
57            return
58        on_message = self.on_message
59        self.on_message = None
60        on_message.callback(payload)
61
62    def onClose(self, wasClean, code, reason):
63        self.on_close.callback(reason)
64
65
66def _connect_client(reactor, api_auth_token, ws_url):
67    factory = WebSocketClientFactory(
68        url=ws_url,
69        headers={
70            "Authorization": "{} {}".format(str(SCHEME, "ascii"), api_auth_token),
71        }
72    )
73    factory.protocol = _StreamingLogClientProtocol
74    factory.on_open = Deferred()
75
76    endpoint = _url_to_endpoint(reactor, ws_url)
77    return endpoint.connect(factory)
78
79
80def _race(left, right):
81    """
82    Wait for the first result from either of two Deferreds.
83
84    Any result, success or failure, causes the return Deferred to fire.  It
85    fires with either a Left or a Right instance depending on whether the left
86    or right argument fired first.
87
88    The Deferred that loses the race is cancelled and any result it eventually
89    produces is discarded.
90    """
91    racing = [True]
92    def got_result(result, which):
93        if racing:
94            racing.pop()
95            loser = which.pick(left, right)
96            loser.cancel()
97            finished.callback(which(result))
98
99    finished = Deferred()
100    left.addBoth(got_result, Left)
101    right.addBoth(got_result, Right)
102    return finished
103
104
105@attr.s
106class Left(object):
107    value = attr.ib()
108
109    @classmethod
110    def pick(cls, left, right):
111        return left
112
113
114@attr.s
115class Right(object):
116    value = attr.ib()
117
118    @classmethod
119    def pick(cls, left, right):
120        return right
121
122
123@inline_callbacks
124def _test_streaming_logs(reactor, temp_dir, alice):
125    cfg = read_config(join(temp_dir, "alice"), "portnum")
126    node_url = cfg.get_config_from_file("node.url")
127    api_auth_token = cfg.get_private_config("api_auth_token")
128
129    ws_url = ensure_text(node_url).replace("http://", "ws://")
130    log_url = ws_url + "private/logs/v1"
131
132    print("Connecting to {}".format(log_url))
133    client = yield _connect_client(reactor, api_auth_token, log_url)
134    print("Connected.")
135    client.on_close = Deferred()
136    client.on_message = Deferred()
137
138    # Capture this now before on_message perhaps goes away.
139    racing = _race(client.on_close, client.on_message)
140
141    # Provoke _some_ log event.
142    yield treq.get(node_url)
143
144    result = yield racing
145
146    assert isinstance(result, Right)
147    json.loads(result.value)
148
149
150@pytest_twisted.inlineCallbacks
151def test_streaming_logs(reactor, temp_dir, alice):
152    yield _test_streaming_logs(reactor, temp_dir, alice)
Note: See TracBrowser for help on using the repository browser.