1 | """ |
---|
2 | Ported to Python 3. |
---|
3 | """ |
---|
4 | from __future__ import annotations |
---|
5 | |
---|
6 | import os |
---|
7 | from struct import ( |
---|
8 | pack, |
---|
9 | ) |
---|
10 | from functools import ( |
---|
11 | partial, |
---|
12 | ) |
---|
13 | |
---|
14 | import attr |
---|
15 | |
---|
16 | from twisted.internet import defer |
---|
17 | from twisted.trial import unittest |
---|
18 | from twisted.application import service |
---|
19 | |
---|
20 | from foolscap.api import Tub, fireEventually, flushEventualQueue |
---|
21 | |
---|
22 | from eliot.twisted import ( |
---|
23 | inline_callbacks, |
---|
24 | ) |
---|
25 | |
---|
26 | from allmydata.introducer.client import IntroducerClient |
---|
27 | from allmydata.crypto import aes |
---|
28 | from allmydata.storage.server import ( |
---|
29 | si_b2a, |
---|
30 | StorageServer, |
---|
31 | FoolscapStorageServer, |
---|
32 | ) |
---|
33 | from allmydata.storage_client import StorageFarmBroker |
---|
34 | from allmydata.immutable.layout import ( |
---|
35 | make_write_bucket_proxy, |
---|
36 | ) |
---|
37 | from allmydata.immutable import offloaded, upload |
---|
38 | from allmydata import uri, client |
---|
39 | from allmydata.util import hashutil, fileutil, mathutil, dictutil |
---|
40 | |
---|
41 | from .no_network import ( |
---|
42 | NoNetworkServer, |
---|
43 | LocalWrapper, |
---|
44 | fireNow, |
---|
45 | ) |
---|
46 | from .common import ( |
---|
47 | EMPTY_CLIENT_CONFIG, |
---|
48 | SyncTestCase, |
---|
49 | ) |
---|
50 | |
---|
51 | from testtools.matchers import ( |
---|
52 | Equals, |
---|
53 | MatchesListwise, |
---|
54 | IsInstance, |
---|
55 | ) |
---|
56 | from testtools.twistedsupport import ( |
---|
57 | succeeded, |
---|
58 | ) |
---|
59 | |
---|
60 | MiB = 1024*1024 |
---|
61 | |
---|
62 | DATA = b"I need help\n" * 1000 |
---|
63 | |
---|
64 | class CHKUploadHelper_fake(offloaded.CHKUploadHelper): |
---|
65 | def start_encrypted(self, eu): |
---|
66 | d = eu.get_size() |
---|
67 | def _got_size(size): |
---|
68 | d2 = eu.get_all_encoding_parameters() |
---|
69 | def _got_parms(parms): |
---|
70 | # just pretend we did the upload |
---|
71 | needed_shares, happy, total_shares, segsize = parms |
---|
72 | ueb_data = {"needed_shares": needed_shares, |
---|
73 | "total_shares": total_shares, |
---|
74 | "segment_size": segsize, |
---|
75 | "size": size, |
---|
76 | } |
---|
77 | ueb_hash = b"fake" |
---|
78 | v = uri.CHKFileVerifierURI(self._storage_index, b"x"*32, |
---|
79 | needed_shares, total_shares, size) |
---|
80 | _UR = upload.UploadResults |
---|
81 | ur = _UR(file_size=size, |
---|
82 | ciphertext_fetched=0, |
---|
83 | preexisting_shares=0, |
---|
84 | pushed_shares=total_shares, |
---|
85 | sharemap={}, |
---|
86 | servermap={}, |
---|
87 | timings={}, |
---|
88 | uri_extension_data=ueb_data, |
---|
89 | uri_extension_hash=ueb_hash, |
---|
90 | verifycapstr=v.to_string()) |
---|
91 | self._upload_status.set_results(ur) |
---|
92 | return ur |
---|
93 | d2.addCallback(_got_parms) |
---|
94 | return d2 |
---|
95 | d.addCallback(_got_size) |
---|
96 | return d |
---|
97 | |
---|
98 | @attr.s |
---|
99 | class FakeCHKCheckerAndUEBFetcher(object): |
---|
100 | """ |
---|
101 | A fake of ``CHKCheckerAndUEBFetcher`` which hard-codes some check result. |
---|
102 | """ |
---|
103 | peer_getter = attr.ib() |
---|
104 | storage_index = attr.ib() |
---|
105 | logparent = attr.ib() |
---|
106 | |
---|
107 | _sharemap = attr.ib() |
---|
108 | _ueb_data = attr.ib() |
---|
109 | |
---|
110 | @property |
---|
111 | def _ueb_hash(self): |
---|
112 | return hashutil.uri_extension_hash( |
---|
113 | uri.pack_extension(self._ueb_data), |
---|
114 | ) |
---|
115 | |
---|
116 | def check(self): |
---|
117 | return defer.succeed(( |
---|
118 | self._sharemap, |
---|
119 | self._ueb_data, |
---|
120 | self._ueb_hash, |
---|
121 | )) |
---|
122 | |
---|
123 | class FakeClient(service.MultiService): |
---|
124 | introducer_clients : list[IntroducerClient] = [] |
---|
125 | DEFAULT_ENCODING_PARAMETERS = {"k":25, |
---|
126 | "happy": 75, |
---|
127 | "n": 100, |
---|
128 | "max_segment_size": 1*MiB, |
---|
129 | } |
---|
130 | |
---|
131 | def get_encoding_parameters(self): |
---|
132 | return self.DEFAULT_ENCODING_PARAMETERS |
---|
133 | def get_storage_broker(self): |
---|
134 | return self.storage_broker |
---|
135 | |
---|
136 | def flush_but_dont_ignore(res): |
---|
137 | d = flushEventualQueue() |
---|
138 | def _done(ignored): |
---|
139 | return res |
---|
140 | d.addCallback(_done) |
---|
141 | return d |
---|
142 | |
---|
143 | def wait_a_few_turns(ignored=None): |
---|
144 | d = fireEventually() |
---|
145 | d.addCallback(fireEventually) |
---|
146 | d.addCallback(fireEventually) |
---|
147 | d.addCallback(fireEventually) |
---|
148 | d.addCallback(fireEventually) |
---|
149 | d.addCallback(fireEventually) |
---|
150 | return d |
---|
151 | |
---|
152 | def upload_data(uploader, data, convergence): |
---|
153 | u = upload.Data(data, convergence=convergence) |
---|
154 | return uploader.upload(u) |
---|
155 | |
---|
156 | |
---|
157 | def make_uploader(helper_furl, parent, override_name=None): |
---|
158 | """ |
---|
159 | Make an ``upload.Uploader`` service pointed at the given helper and with |
---|
160 | the given service parent. |
---|
161 | |
---|
162 | :param bytes helper_furl: The Foolscap URL of the upload helper. |
---|
163 | |
---|
164 | :param IServiceCollection parent: A parent to assign to the new uploader. |
---|
165 | |
---|
166 | :param str override_name: If not ``None``, a new name for the uploader |
---|
167 | service. Multiple services cannot coexist with the same name. |
---|
168 | """ |
---|
169 | u = upload.Uploader(helper_furl) |
---|
170 | if override_name is not None: |
---|
171 | u.name = override_name |
---|
172 | u.setServiceParent(parent) |
---|
173 | return u |
---|
174 | |
---|
175 | |
---|
176 | class AssistedUpload(unittest.TestCase): |
---|
177 | def setUp(self): |
---|
178 | self.tub = t = Tub() |
---|
179 | t.setOption("expose-remote-exception-types", False) |
---|
180 | self.s = FakeClient() |
---|
181 | self.s.storage_broker = StorageFarmBroker( |
---|
182 | True, |
---|
183 | lambda h: self.tub, |
---|
184 | EMPTY_CLIENT_CONFIG, |
---|
185 | ) |
---|
186 | self.s.secret_holder = client.SecretHolder(b"lease secret", b"converge") |
---|
187 | self.s.startService() |
---|
188 | |
---|
189 | t.setServiceParent(self.s) |
---|
190 | self.s.tub = t |
---|
191 | # we never actually use this for network traffic, so it can use a |
---|
192 | # bogus host/port |
---|
193 | t.setLocation(b"bogus:1234") |
---|
194 | |
---|
195 | def setUpHelper(self, basedir, chk_upload=CHKUploadHelper_fake, chk_checker=None): |
---|
196 | fileutil.make_dirs(basedir) |
---|
197 | self.helper = offloaded.Helper( |
---|
198 | basedir, |
---|
199 | self.s.storage_broker, |
---|
200 | self.s.secret_holder, |
---|
201 | None, |
---|
202 | None, |
---|
203 | ) |
---|
204 | if chk_upload is not None: |
---|
205 | self.helper.chk_upload = chk_upload |
---|
206 | if chk_checker is not None: |
---|
207 | self.helper.chk_checker = chk_checker |
---|
208 | self.helper_furl = self.tub.registerReference(self.helper) |
---|
209 | |
---|
210 | def tearDown(self): |
---|
211 | d = self.s.stopService() |
---|
212 | d.addCallback(fireEventually) |
---|
213 | d.addBoth(flush_but_dont_ignore) |
---|
214 | return d |
---|
215 | |
---|
216 | def test_one(self): |
---|
217 | """ |
---|
218 | Some data that has never been uploaded before can be uploaded in CHK |
---|
219 | format using the ``RIHelper`` provider and ``Uploader.upload``. |
---|
220 | """ |
---|
221 | self.basedir = "helper/AssistedUpload/test_one" |
---|
222 | self.setUpHelper(self.basedir) |
---|
223 | u = make_uploader(self.helper_furl, self.s) |
---|
224 | |
---|
225 | d = wait_a_few_turns() |
---|
226 | |
---|
227 | def _ready(res): |
---|
228 | self.assertTrue( |
---|
229 | u._helper, |
---|
230 | "Expected uploader to have a helper reference, had {} instead.".format( |
---|
231 | u._helper, |
---|
232 | ), |
---|
233 | ) |
---|
234 | return upload_data(u, DATA, convergence=b"some convergence string") |
---|
235 | d.addCallback(_ready) |
---|
236 | |
---|
237 | def _uploaded(results): |
---|
238 | the_uri = results.get_uri() |
---|
239 | self.assertIn(b"CHK", the_uri) |
---|
240 | self.assertNotEqual( |
---|
241 | results.get_pushed_shares(), |
---|
242 | 0, |
---|
243 | ) |
---|
244 | d.addCallback(_uploaded) |
---|
245 | |
---|
246 | def _check_empty(res): |
---|
247 | # Make sure the intermediate artifacts aren't left lying around. |
---|
248 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
---|
249 | self.assertEqual(files, []) |
---|
250 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
---|
251 | self.assertEqual(files, []) |
---|
252 | d.addCallback(_check_empty) |
---|
253 | |
---|
254 | return d |
---|
255 | |
---|
256 | @inline_callbacks |
---|
257 | def test_concurrent(self): |
---|
258 | """ |
---|
259 | The same data can be uploaded by more than one ``Uploader`` at a time. |
---|
260 | """ |
---|
261 | self.basedir = "helper/AssistedUpload/test_concurrent" |
---|
262 | self.setUpHelper(self.basedir) |
---|
263 | u1 = make_uploader(self.helper_furl, self.s, "u1") |
---|
264 | u2 = make_uploader(self.helper_furl, self.s, "u2") |
---|
265 | |
---|
266 | yield wait_a_few_turns() |
---|
267 | |
---|
268 | for u in [u1, u2]: |
---|
269 | self.assertTrue( |
---|
270 | u._helper, |
---|
271 | "Expected uploader to have a helper reference, had {} instead.".format( |
---|
272 | u._helper, |
---|
273 | ), |
---|
274 | ) |
---|
275 | |
---|
276 | uploads = list( |
---|
277 | upload_data(u, DATA, convergence=b"some convergence string") |
---|
278 | for u |
---|
279 | in [u1, u2] |
---|
280 | ) |
---|
281 | |
---|
282 | result1, result2 = yield defer.gatherResults(uploads) |
---|
283 | |
---|
284 | self.assertEqual( |
---|
285 | result1.get_uri(), |
---|
286 | result2.get_uri(), |
---|
287 | ) |
---|
288 | # It would be really cool to assert that result1.get_pushed_shares() + |
---|
289 | # result2.get_pushed_shares() == total_shares here. However, we're |
---|
290 | # faking too much for that to be meaningful here. Also it doesn't |
---|
291 | # hold because we don't actually push _anything_, we just lie about |
---|
292 | # having pushed stuff. |
---|
293 | |
---|
294 | def test_previous_upload_failed(self): |
---|
295 | self.basedir = "helper/AssistedUpload/test_previous_upload_failed" |
---|
296 | self.setUpHelper(self.basedir) |
---|
297 | |
---|
298 | # we want to make sure that an upload which fails (leaving the |
---|
299 | # ciphertext in the CHK_encoding/ directory) does not prevent a later |
---|
300 | # attempt to upload that file from working. We simulate this by |
---|
301 | # populating the directory manually. The hardest part is guessing the |
---|
302 | # storage index. |
---|
303 | |
---|
304 | k = FakeClient.DEFAULT_ENCODING_PARAMETERS["k"] |
---|
305 | n = FakeClient.DEFAULT_ENCODING_PARAMETERS["n"] |
---|
306 | max_segsize = FakeClient.DEFAULT_ENCODING_PARAMETERS["max_segment_size"] |
---|
307 | segsize = min(max_segsize, len(DATA)) |
---|
308 | # this must be a multiple of 'required_shares'==k |
---|
309 | segsize = mathutil.next_multiple(segsize, k) |
---|
310 | |
---|
311 | key = hashutil.convergence_hash(k, n, segsize, DATA, b"test convergence string") |
---|
312 | assert len(key) == 16 |
---|
313 | encryptor = aes.create_encryptor(key) |
---|
314 | SI = hashutil.storage_index_hash(key) |
---|
315 | SI_s = str(si_b2a(SI), "utf-8") |
---|
316 | encfile = os.path.join(self.basedir, "CHK_encoding", SI_s) |
---|
317 | f = open(encfile, "wb") |
---|
318 | f.write(aes.encrypt_data(encryptor, DATA)) |
---|
319 | f.close() |
---|
320 | |
---|
321 | u = make_uploader(self.helper_furl, self.s) |
---|
322 | |
---|
323 | d = wait_a_few_turns() |
---|
324 | |
---|
325 | def _ready(res): |
---|
326 | assert u._helper |
---|
327 | return upload_data(u, DATA, convergence=b"test convergence string") |
---|
328 | d.addCallback(_ready) |
---|
329 | def _uploaded(results): |
---|
330 | the_uri = results.get_uri() |
---|
331 | assert b"CHK" in the_uri |
---|
332 | d.addCallback(_uploaded) |
---|
333 | |
---|
334 | def _check_empty(res): |
---|
335 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
---|
336 | self.failUnlessEqual(files, []) |
---|
337 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
---|
338 | self.failUnlessEqual(files, []) |
---|
339 | d.addCallback(_check_empty) |
---|
340 | |
---|
341 | return d |
---|
342 | |
---|
343 | @inline_callbacks |
---|
344 | def test_already_uploaded(self): |
---|
345 | """ |
---|
346 | If enough shares to satisfy the needed parameter already exist, the upload |
---|
347 | succeeds without pushing any shares. |
---|
348 | """ |
---|
349 | params = FakeClient.DEFAULT_ENCODING_PARAMETERS |
---|
350 | chk_checker = partial( |
---|
351 | FakeCHKCheckerAndUEBFetcher, |
---|
352 | sharemap=dictutil.DictOfSets({ |
---|
353 | 0: {b"server0"}, |
---|
354 | 1: {b"server1"}, |
---|
355 | }), |
---|
356 | ueb_data={ |
---|
357 | "size": len(DATA), |
---|
358 | "segment_size": min(params["max_segment_size"], len(DATA)), |
---|
359 | "needed_shares": params["k"], |
---|
360 | "total_shares": params["n"], |
---|
361 | }, |
---|
362 | ) |
---|
363 | self.basedir = "helper/AssistedUpload/test_already_uploaded" |
---|
364 | self.setUpHelper( |
---|
365 | self.basedir, |
---|
366 | chk_checker=chk_checker, |
---|
367 | ) |
---|
368 | u = make_uploader(self.helper_furl, self.s) |
---|
369 | |
---|
370 | yield wait_a_few_turns() |
---|
371 | |
---|
372 | assert u._helper |
---|
373 | |
---|
374 | results = yield upload_data(u, DATA, convergence=b"some convergence string") |
---|
375 | the_uri = results.get_uri() |
---|
376 | assert b"CHK" in the_uri |
---|
377 | |
---|
378 | files = os.listdir(os.path.join(self.basedir, "CHK_encoding")) |
---|
379 | self.failUnlessEqual(files, []) |
---|
380 | files = os.listdir(os.path.join(self.basedir, "CHK_incoming")) |
---|
381 | self.failUnlessEqual(files, []) |
---|
382 | |
---|
383 | self.assertEqual( |
---|
384 | results.get_pushed_shares(), |
---|
385 | 0, |
---|
386 | ) |
---|
387 | |
---|
388 | |
---|
389 | class CHKCheckerAndUEBFetcherTests(SyncTestCase): |
---|
390 | """ |
---|
391 | Tests for ``CHKCheckerAndUEBFetcher``. |
---|
392 | """ |
---|
393 | def test_check_no_peers(self): |
---|
394 | """ |
---|
395 | If the supplied "peer getter" returns no peers then |
---|
396 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
---|
397 | with ``False``. |
---|
398 | """ |
---|
399 | storage_index = b"a" * 16 |
---|
400 | peers = {storage_index: []} |
---|
401 | caf = offloaded.CHKCheckerAndUEBFetcher( |
---|
402 | peers.get, |
---|
403 | storage_index, |
---|
404 | None, |
---|
405 | ) |
---|
406 | self.assertThat( |
---|
407 | caf.check(), |
---|
408 | succeeded(Equals(False)), |
---|
409 | ) |
---|
410 | |
---|
411 | @inline_callbacks |
---|
412 | def test_check_ueb_unavailable(self): |
---|
413 | """ |
---|
414 | If the UEB cannot be read from any of the peers supplied by the "peer |
---|
415 | getter" then ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` |
---|
416 | that fires with ``False``. |
---|
417 | """ |
---|
418 | storage_index = b"a" * 16 |
---|
419 | serverid = b"b" * 20 |
---|
420 | storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
---|
421 | rref_without_ueb = LocalWrapper(storage, fireNow) |
---|
422 | yield write_bad_share(rref_without_ueb, storage_index) |
---|
423 | server_without_ueb = NoNetworkServer(serverid, rref_without_ueb) |
---|
424 | peers = {storage_index: [server_without_ueb]} |
---|
425 | caf = offloaded.CHKCheckerAndUEBFetcher( |
---|
426 | peers.get, |
---|
427 | storage_index, |
---|
428 | None, |
---|
429 | ) |
---|
430 | self.assertThat( |
---|
431 | caf.check(), |
---|
432 | succeeded(Equals(False)), |
---|
433 | ) |
---|
434 | |
---|
435 | @inline_callbacks |
---|
436 | def test_not_enough_shares(self): |
---|
437 | """ |
---|
438 | If fewer shares are found than are required to reassemble the data then |
---|
439 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
---|
440 | with ``False``. |
---|
441 | """ |
---|
442 | storage_index = b"a" * 16 |
---|
443 | serverid = b"b" * 20 |
---|
444 | storage = FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
---|
445 | rref_with_ueb = LocalWrapper(storage, fireNow) |
---|
446 | ueb = { |
---|
447 | "needed_shares": 2, |
---|
448 | "total_shares": 2, |
---|
449 | "segment_size": 128 * 1024, |
---|
450 | "size": 1024, |
---|
451 | } |
---|
452 | yield write_good_share(rref_with_ueb, storage_index, ueb, [0]) |
---|
453 | |
---|
454 | server_with_ueb = NoNetworkServer(serverid, rref_with_ueb) |
---|
455 | peers = {storage_index: [server_with_ueb]} |
---|
456 | caf = offloaded.CHKCheckerAndUEBFetcher( |
---|
457 | peers.get, |
---|
458 | storage_index, |
---|
459 | None, |
---|
460 | ) |
---|
461 | self.assertThat( |
---|
462 | caf.check(), |
---|
463 | succeeded(Equals(False)), |
---|
464 | ) |
---|
465 | |
---|
466 | @inline_callbacks |
---|
467 | def test_enough_shares(self): |
---|
468 | """ |
---|
469 | If enough shares are found to reassemble the data then |
---|
470 | ``CHKCheckerAndUEBFetcher.check`` returns a ``Deferred`` that fires |
---|
471 | with share and share placement information. |
---|
472 | """ |
---|
473 | storage_index = b"a" * 16 |
---|
474 | serverids = list( |
---|
475 | ch * 20 |
---|
476 | for ch |
---|
477 | in [b"b", b"c"] |
---|
478 | ) |
---|
479 | storages = list( |
---|
480 | FoolscapStorageServer(StorageServer(self.mktemp(), serverid)) |
---|
481 | for serverid |
---|
482 | in serverids |
---|
483 | ) |
---|
484 | rrefs_with_ueb = list( |
---|
485 | LocalWrapper(storage, fireNow) |
---|
486 | for storage |
---|
487 | in storages |
---|
488 | ) |
---|
489 | ueb = { |
---|
490 | "needed_shares": len(serverids), |
---|
491 | "total_shares": len(serverids), |
---|
492 | "segment_size": 128 * 1024, |
---|
493 | "size": 1024, |
---|
494 | } |
---|
495 | for n, rref_with_ueb in enumerate(rrefs_with_ueb): |
---|
496 | yield write_good_share(rref_with_ueb, storage_index, ueb, [n]) |
---|
497 | |
---|
498 | servers_with_ueb = list( |
---|
499 | NoNetworkServer(serverid, rref_with_ueb) |
---|
500 | for (serverid, rref_with_ueb) |
---|
501 | in zip(serverids, rrefs_with_ueb) |
---|
502 | ) |
---|
503 | peers = {storage_index: servers_with_ueb} |
---|
504 | caf = offloaded.CHKCheckerAndUEBFetcher( |
---|
505 | peers.get, |
---|
506 | storage_index, |
---|
507 | None, |
---|
508 | ) |
---|
509 | self.assertThat( |
---|
510 | caf.check(), |
---|
511 | succeeded(MatchesListwise([ |
---|
512 | Equals({ |
---|
513 | n: {serverid} |
---|
514 | for (n, serverid) |
---|
515 | in enumerate(serverids) |
---|
516 | }), |
---|
517 | Equals(ueb), |
---|
518 | IsInstance(bytes), |
---|
519 | ])), |
---|
520 | ) |
---|
521 | |
---|
522 | |
---|
523 | def write_bad_share(storage_rref, storage_index): |
---|
524 | """ |
---|
525 | Write a share with a corrupt URI extension block. |
---|
526 | """ |
---|
527 | # Write some trash to the right bucket on this storage server. It won't |
---|
528 | # have a recoverable UEB block. |
---|
529 | return write_share(storage_rref, storage_index, [0], b"\0" * 1024) |
---|
530 | |
---|
531 | |
---|
532 | def write_good_share(storage_rref, storage_index, ueb, sharenums): |
---|
533 | """ |
---|
534 | Write a valid share with the given URI extension block. |
---|
535 | """ |
---|
536 | write_proxy = make_write_bucket_proxy( |
---|
537 | storage_rref, |
---|
538 | None, |
---|
539 | 1024, |
---|
540 | ueb["segment_size"], |
---|
541 | 1, |
---|
542 | 1, |
---|
543 | ueb["size"], |
---|
544 | ) |
---|
545 | # See allmydata/immutable/layout.py |
---|
546 | offset = write_proxy._offsets["uri_extension"] |
---|
547 | filler = b"\0" * (offset - len(write_proxy._offset_data)) |
---|
548 | ueb_data = uri.pack_extension(ueb) |
---|
549 | data = ( |
---|
550 | write_proxy._offset_data + |
---|
551 | filler + |
---|
552 | pack(write_proxy.fieldstruct, len(ueb_data)) + |
---|
553 | ueb_data |
---|
554 | ) |
---|
555 | return write_share(storage_rref, storage_index, sharenums, data) |
---|
556 | |
---|
557 | |
---|
558 | @inline_callbacks |
---|
559 | def write_share(storage_rref, storage_index, sharenums, sharedata): |
---|
560 | """ |
---|
561 | Write the given share data to the given storage index using the given |
---|
562 | IStorageServer remote reference. |
---|
563 | |
---|
564 | :param foolscap.ipb.IRemoteReference storage_rref: A remote reference to |
---|
565 | an IStorageServer. |
---|
566 | |
---|
567 | :param bytes storage_index: The storage index to which to write the share |
---|
568 | data. |
---|
569 | |
---|
570 | :param [int] sharenums: The share numbers to which to write this sharedata. |
---|
571 | |
---|
572 | :param bytes sharedata: The ciphertext to write as the share. |
---|
573 | """ |
---|
574 | ignored, writers = yield storage_rref.callRemote( |
---|
575 | "allocate_buckets", |
---|
576 | storage_index, |
---|
577 | b"x" * 16, |
---|
578 | b"x" * 16, |
---|
579 | sharenums, |
---|
580 | len(sharedata), |
---|
581 | LocalWrapper(None), |
---|
582 | |
---|
583 | ) |
---|
584 | [writer] = writers.values() |
---|
585 | yield writer.callRemote("write", 0, sharedata) |
---|
586 | yield writer.callRemote("close") |
---|