1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | |
---|
5 | from six import ensure_text |
---|
6 | |
---|
7 | import json |
---|
8 | |
---|
9 | from os.path import ( |
---|
10 | join, |
---|
11 | ) |
---|
12 | from urllib.parse import ( |
---|
13 | urlsplit, |
---|
14 | ) |
---|
15 | |
---|
16 | import attr |
---|
17 | |
---|
18 | from twisted.internet.defer import ( |
---|
19 | Deferred, |
---|
20 | ) |
---|
21 | from twisted.internet.endpoints import ( |
---|
22 | HostnameEndpoint, |
---|
23 | ) |
---|
24 | |
---|
25 | import treq |
---|
26 | |
---|
27 | from autobahn.twisted.websocket import ( |
---|
28 | WebSocketClientFactory, |
---|
29 | WebSocketClientProtocol, |
---|
30 | ) |
---|
31 | |
---|
32 | from allmydata.client import ( |
---|
33 | read_config, |
---|
34 | ) |
---|
35 | from allmydata.web.private import ( |
---|
36 | SCHEME, |
---|
37 | ) |
---|
38 | from allmydata.util.eliotutil import ( |
---|
39 | inline_callbacks, |
---|
40 | ) |
---|
41 | |
---|
42 | import pytest_twisted |
---|
43 | |
---|
44 | def _url_to_endpoint(reactor, url): |
---|
45 | netloc = urlsplit(url).netloc |
---|
46 | host, port = netloc.split(":") |
---|
47 | return HostnameEndpoint(reactor, host, int(port)) |
---|
48 | |
---|
49 | |
---|
50 | class _StreamingLogClientProtocol(WebSocketClientProtocol): |
---|
51 | def onOpen(self): |
---|
52 | self.factory.on_open.callback(self) |
---|
53 | |
---|
54 | def onMessage(self, payload, isBinary): |
---|
55 | if self.on_message is None: |
---|
56 | # Already did our job, ignore it |
---|
57 | return |
---|
58 | on_message = self.on_message |
---|
59 | self.on_message = None |
---|
60 | on_message.callback(payload) |
---|
61 | |
---|
62 | def onClose(self, wasClean, code, reason): |
---|
63 | self.on_close.callback(reason) |
---|
64 | |
---|
65 | |
---|
66 | def _connect_client(reactor, api_auth_token, ws_url): |
---|
67 | factory = WebSocketClientFactory( |
---|
68 | url=ws_url, |
---|
69 | headers={ |
---|
70 | "Authorization": "{} {}".format(str(SCHEME, "ascii"), api_auth_token), |
---|
71 | } |
---|
72 | ) |
---|
73 | factory.protocol = _StreamingLogClientProtocol |
---|
74 | factory.on_open = Deferred() |
---|
75 | |
---|
76 | endpoint = _url_to_endpoint(reactor, ws_url) |
---|
77 | return endpoint.connect(factory) |
---|
78 | |
---|
79 | |
---|
80 | def _race(left, right): |
---|
81 | """ |
---|
82 | Wait for the first result from either of two Deferreds. |
---|
83 | |
---|
84 | Any result, success or failure, causes the return Deferred to fire. It |
---|
85 | fires with either a Left or a Right instance depending on whether the left |
---|
86 | or right argument fired first. |
---|
87 | |
---|
88 | The Deferred that loses the race is cancelled and any result it eventually |
---|
89 | produces is discarded. |
---|
90 | """ |
---|
91 | racing = [True] |
---|
92 | def got_result(result, which): |
---|
93 | if racing: |
---|
94 | racing.pop() |
---|
95 | loser = which.pick(left, right) |
---|
96 | loser.cancel() |
---|
97 | finished.callback(which(result)) |
---|
98 | |
---|
99 | finished = Deferred() |
---|
100 | left.addBoth(got_result, Left) |
---|
101 | right.addBoth(got_result, Right) |
---|
102 | return finished |
---|
103 | |
---|
104 | |
---|
105 | @attr.s |
---|
106 | class Left(object): |
---|
107 | value = attr.ib() |
---|
108 | |
---|
109 | @classmethod |
---|
110 | def pick(cls, left, right): |
---|
111 | return left |
---|
112 | |
---|
113 | |
---|
114 | @attr.s |
---|
115 | class Right(object): |
---|
116 | value = attr.ib() |
---|
117 | |
---|
118 | @classmethod |
---|
119 | def pick(cls, left, right): |
---|
120 | return right |
---|
121 | |
---|
122 | |
---|
123 | @inline_callbacks |
---|
124 | def _test_streaming_logs(reactor, temp_dir, alice): |
---|
125 | cfg = read_config(join(temp_dir, "alice"), "portnum") |
---|
126 | node_url = cfg.get_config_from_file("node.url") |
---|
127 | api_auth_token = cfg.get_private_config("api_auth_token") |
---|
128 | |
---|
129 | ws_url = ensure_text(node_url).replace("http://", "ws://") |
---|
130 | log_url = ws_url + "private/logs/v1" |
---|
131 | |
---|
132 | print("Connecting to {}".format(log_url)) |
---|
133 | client = yield _connect_client(reactor, api_auth_token, log_url) |
---|
134 | print("Connected.") |
---|
135 | client.on_close = Deferred() |
---|
136 | client.on_message = Deferred() |
---|
137 | |
---|
138 | # Capture this now before on_message perhaps goes away. |
---|
139 | racing = _race(client.on_close, client.on_message) |
---|
140 | |
---|
141 | # Provoke _some_ log event. |
---|
142 | yield treq.get(node_url) |
---|
143 | |
---|
144 | result = yield racing |
---|
145 | |
---|
146 | assert isinstance(result, Right) |
---|
147 | json.loads(result.value) |
---|
148 | |
---|
149 | |
---|
150 | @pytest_twisted.inlineCallbacks |
---|
151 | def test_streaming_logs(reactor, temp_dir, alice): |
---|
152 | yield _test_streaming_logs(reactor, temp_dir, alice) |
---|