1 | diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/immutable.py ticket999-v16/src/allmydata/storage/backends/s3/immutable.py |
---|
2 | *** ticket999-v13a/src/allmydata/storage/backends/s3/immutable.py 2011-09-28 00:34:12.806614999 +0100 |
---|
3 | --- ticket999-v16/src/allmydata/storage/backends/s3/immutable.py 2011-09-29 05:13:51.646615008 +0100 |
---|
4 | *************** |
---|
5 | *** 1,13 **** |
---|
6 | |
---|
7 | import struct |
---|
8 | |
---|
9 | ! from zope.interface import implements |
---|
10 | |
---|
11 | from allmydata.interfaces import IStoredShare |
---|
12 | |
---|
13 | from allmydata.util.assertutil import precondition |
---|
14 | from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, DataTooLargeError |
---|
15 | |
---|
16 | |
---|
17 | # Each share file (with key 'shares/$PREFIX/$STORAGEINDEX/$SHNUM') contains |
---|
18 | # lease information [currently inaccessible] and share data. The share data is |
---|
19 | --- 1,15 ---- |
---|
20 | |
---|
21 | import struct |
---|
22 | |
---|
23 | ! from twisted.internet import defer |
---|
24 | |
---|
25 | + from zope.interface import implements |
---|
26 | from allmydata.interfaces import IStoredShare |
---|
27 | |
---|
28 | from allmydata.util.assertutil import precondition |
---|
29 | from allmydata.storage.common import si_b2a, UnknownImmutableContainerVersionError, DataTooLargeError |
---|
30 | + from allmydata.storage.backends.s3.s3_common import get_s3_share_key |
---|
31 | |
---|
32 | |
---|
33 | # Each share file (with key 'shares/$PREFIX/$STORAGEINDEX/$SHNUM') contains |
---|
34 | # lease information [currently inaccessible] and share data. The share data is |
---|
35 | *************** |
---|
36 | *** 28,77 **** |
---|
37 | LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility |
---|
38 | HEADER = ">LLL" |
---|
39 | HEADER_SIZE = struct.calcsize(HEADER) |
---|
40 | |
---|
41 | ! def __init__(self, storageindex, shnum, s3bucket, max_size=None, data=None): |
---|
42 | """ |
---|
43 | If max_size is not None then I won't allow more than max_size to be written to me. |
---|
44 | """ |
---|
45 | ! precondition((max_size is not None) or (data is not None), max_size, data) |
---|
46 | self._storageindex = storageindex |
---|
47 | self._shnum = shnum |
---|
48 | - self._s3bucket = s3bucket |
---|
49 | self._max_size = max_size |
---|
50 | self._data = data |
---|
51 | |
---|
52 | ! sistr = self.get_storage_index_string() |
---|
53 | ! self._key = "shares/%s/%s/%d" % (sistr[:2], sistr, shnum) |
---|
54 | |
---|
55 | ! if data is None: # creating share |
---|
56 | # The second field, which was the four-byte share data length in |
---|
57 | # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. |
---|
58 | # We also write 0 for the number of leases. |
---|
59 | self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) |
---|
60 | ! self._end_offset = self.HEADER_SIZE + max_size |
---|
61 | self._size = self.HEADER_SIZE |
---|
62 | self._writes = [] |
---|
63 | else: |
---|
64 | ! (version, unused, num_leases) = struct.unpack(self.HEADER, data[:self.HEADER_SIZE]) |
---|
65 | |
---|
66 | if version != 1: |
---|
67 | msg = "%r had version %d but we wanted 1" % (self, version) |
---|
68 | raise UnknownImmutableContainerVersionError(msg) |
---|
69 | |
---|
70 | # We cannot write leases in share files, but allow them to be present |
---|
71 | # in case a share file is copied from a disk backend, or in case we |
---|
72 | # need them in future. |
---|
73 | ! self._size = len(data) |
---|
74 | self._end_offset = self._size - (num_leases * self.LEASE_SIZE) |
---|
75 | ! self._data_offset = self.HEADER_SIZE |
---|
76 | ! |
---|
77 | ! def __repr__(self): |
---|
78 | ! return ("<ImmutableS3Share at %r>" % (self._key,)) |
---|
79 | |
---|
80 | def close(self): |
---|
81 | ! # TODO: finalize write to S3. |
---|
82 | ! pass |
---|
83 | |
---|
84 | def get_used_space(self): |
---|
85 | return self._size |
---|
86 | |
---|
87 | --- 30,101 ---- |
---|
88 | LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility |
---|
89 | HEADER = ">LLL" |
---|
90 | HEADER_SIZE = struct.calcsize(HEADER) |
---|
91 | |
---|
92 | ! def __init__(self, s3bucket, storageindex, shnum, max_size=None, data=None): |
---|
93 | """ |
---|
94 | If max_size is not None then I won't allow more than max_size to be written to me. |
---|
95 | + |
---|
96 | + Clients should use the load_immutable_s3_share and create_immutable_s3_share |
---|
97 | + factory functions rather than creating instances directly. |
---|
98 | """ |
---|
99 | ! self._s3bucket = s3bucket |
---|
100 | self._storageindex = storageindex |
---|
101 | self._shnum = shnum |
---|
102 | self._max_size = max_size |
---|
103 | self._data = data |
---|
104 | + self._key = get_s3_share_key(storageindex, shnum) |
---|
105 | + self._data_offset = self.HEADER_SIZE |
---|
106 | + self._loaded = False |
---|
107 | |
---|
108 | ! def __repr__(self): |
---|
109 | ! return ("<ImmutableS3Share at %r>" % (self._key,)) |
---|
110 | |
---|
111 | ! def load(self): |
---|
112 | ! if self._max_size is not None: # creating share |
---|
113 | # The second field, which was the four-byte share data length in |
---|
114 | # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. |
---|
115 | # We also write 0 for the number of leases. |
---|
116 | self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) |
---|
117 | ! self._end_offset = self.HEADER_SIZE + self._max_size |
---|
118 | self._size = self.HEADER_SIZE |
---|
119 | self._writes = [] |
---|
120 | + self._loaded = True |
---|
121 | + return defer.succeed(None) |
---|
122 | + |
---|
123 | + if self._data is None: |
---|
124 | + # If we don't already have the data, get it from S3. |
---|
125 | + d = self._s3bucket.get_object(self._key) |
---|
126 | else: |
---|
127 | ! d = defer.succeed(self._data) |
---|
128 | ! |
---|
129 | ! def _got_data(data): |
---|
130 | ! self._data = data |
---|
131 | ! header = self._data[:self.HEADER_SIZE] |
---|
132 | ! (version, unused, num_leases) = struct.unpack(self.HEADER, header) |
---|
133 | |
---|
134 | if version != 1: |
---|
135 | msg = "%r had version %d but we wanted 1" % (self, version) |
---|
136 | raise UnknownImmutableContainerVersionError(msg) |
---|
137 | |
---|
138 | # We cannot write leases in share files, but allow them to be present |
---|
139 | # in case a share file is copied from a disk backend, or in case we |
---|
140 | # need them in future. |
---|
141 | ! self._size = len(self._data) |
---|
142 | self._end_offset = self._size - (num_leases * self.LEASE_SIZE) |
---|
143 | ! self._loaded = True |
---|
144 | ! d.addCallback(_got_data) |
---|
145 | ! return d |
---|
146 | |
---|
147 | def close(self): |
---|
148 | ! # This will briefly use memory equal to double the share size. |
---|
149 | ! # We really want to stream writes to S3, but I don't think txaws supports that yet |
---|
150 | ! # (and neither does IS3Bucket, since that's a thin wrapper over the txaws S3 API). |
---|
151 | ! |
---|
152 | ! self._data = "".join(self._writes) |
---|
153 | ! del self._writes |
---|
154 | ! self._s3bucket.put_object(self._key, self._data) |
---|
155 | ! return defer.succeed(None) |
---|
156 | |
---|
157 | def get_used_space(self): |
---|
158 | return self._size |
---|
159 | |
---|
160 | *************** |
---|
161 | *** 101,109 **** |
---|
162 | def readv(self, readv): |
---|
163 | datav = [] |
---|
164 | for (offset, length) in readv: |
---|
165 | datav.append(self.read_share_data(offset, length)) |
---|
166 | ! return datav |
---|
167 | |
---|
168 | def read_share_data(self, offset, length): |
---|
169 | precondition(offset >= 0) |
---|
170 | |
---|
171 | --- 125,133 ---- |
---|
172 | def readv(self, readv): |
---|
173 | datav = [] |
---|
174 | for (offset, length) in readv: |
---|
175 | datav.append(self.read_share_data(offset, length)) |
---|
176 | ! return defer.succeed(datav) |
---|
177 | |
---|
178 | def read_share_data(self, offset, length): |
---|
179 | precondition(offset >= 0) |
---|
180 | |
---|
181 | *************** |
---|
182 | *** 111,122 **** |
---|
183 | # beyond the end of the data return an empty string. |
---|
184 | seekpos = self._data_offset+offset |
---|
185 | actuallength = max(0, min(length, self._end_offset-seekpos)) |
---|
186 | if actuallength == 0: |
---|
187 | ! return "" |
---|
188 | ! |
---|
189 | ! # TODO: perform an S3 GET request, possibly with a Content-Range header. |
---|
190 | ! return "\x00"*actuallength |
---|
191 | |
---|
192 | def write_share_data(self, offset, data): |
---|
193 | length = len(data) |
---|
194 | precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) |
---|
195 | --- 135,144 ---- |
---|
196 | # beyond the end of the data return an empty string. |
---|
197 | seekpos = self._data_offset+offset |
---|
198 | actuallength = max(0, min(length, self._end_offset-seekpos)) |
---|
199 | if actuallength == 0: |
---|
200 | ! return defer.succeed("") |
---|
201 | ! return defer.succeed(self._data[offset:offset+actuallength]) |
---|
202 | |
---|
203 | def write_share_data(self, offset, data): |
---|
204 | length = len(data) |
---|
205 | precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) |
---|
206 | *************** |
---|
207 | *** 126,132 **** |
---|
208 | --- 148,162 ---- |
---|
209 | if offset > self._size: |
---|
210 | self._writes.append("\x00" * (offset - self._size)) |
---|
211 | self._writes.append(data) |
---|
212 | self._size = offset + len(data) |
---|
213 | + return defer.succeed(None) |
---|
214 | |
---|
215 | def add_lease(self, lease_info): |
---|
216 | pass |
---|
217 | + |
---|
218 | + |
---|
219 | + def load_immutable_s3_share(s3bucket, storageindex, shnum, data=None): |
---|
220 | + return ImmutableS3Share(s3bucket, storageindex, shnum, data=data).load() |
---|
221 | + |
---|
222 | + def create_immutable_s3_share(s3bucket, storageindex, shnum, max_size): |
---|
223 | + return ImmutableS3Share(s3bucket, storageindex, shnum, max_size=max_size).load() |
---|
224 | diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/mutable.py ticket999-v16/src/allmydata/storage/backends/s3/mutable.py |
---|
225 | *** ticket999-v13a/src/allmydata/storage/backends/s3/mutable.py 2011-09-28 00:34:12.796615003 +0100 |
---|
226 | --- ticket999-v16/src/allmydata/storage/backends/s3/mutable.py 2011-09-29 05:13:51.656615001 +0100 |
---|
227 | *************** |
---|
228 | *** 1,7 **** |
---|
229 | --- 1,9 ---- |
---|
230 | |
---|
231 | import struct |
---|
232 | |
---|
233 | + from twisted.internet import defer |
---|
234 | + |
---|
235 | from zope.interface import implements |
---|
236 | |
---|
237 | from allmydata.interfaces import IStoredMutableShare, BadWriteEnablerError |
---|
238 | from allmydata.util import fileutil, idlib, log |
---|
239 | *************** |
---|
240 | *** 11,18 **** |
---|
241 | --- 13,21 ---- |
---|
242 | from allmydata.storage.common import si_b2a, UnknownMutableContainerVersionError, \ |
---|
243 | DataTooLargeError |
---|
244 | from allmydata.storage.lease import LeaseInfo |
---|
245 | from allmydata.storage.backends.base import testv_compare |
---|
246 | + from allmydata.mutable.layout import MUTABLE_MAGIC |
---|
247 | |
---|
248 | |
---|
249 | # The MutableS3Share is like the ImmutableS3Share, but used for mutable data. |
---|
250 | # It has a different layout. See docs/mutable.rst for more details. |
---|
251 | *************** |
---|
252 | *** 51,67 **** |
---|
253 | assert LEASE_SIZE == 92 |
---|
254 | DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE |
---|
255 | assert DATA_OFFSET == 468, DATA_OFFSET |
---|
256 | |
---|
257 | ! # our sharefiles share with a recognizable string, plus some random |
---|
258 | ! # binary data to reduce the chance that a regular text file will look |
---|
259 | ! # like a sharefile. |
---|
260 | ! MAGIC = "Tahoe mutable container v1\n" + "\x75\x09\x44\x03\x8e" |
---|
261 | assert len(MAGIC) == 32 |
---|
262 | MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary |
---|
263 | # TODO: decide upon a policy for max share size |
---|
264 | |
---|
265 | ! def __init__(self, storageindex, shnum, home, parent=None): |
---|
266 | self._storageindex = storageindex |
---|
267 | self._shnum = shnum |
---|
268 | self._home = home |
---|
269 | if self._home.exists(): |
---|
270 | --- 54,71 ---- |
---|
271 | assert LEASE_SIZE == 92 |
---|
272 | DATA_OFFSET = HEADER_SIZE + 4*LEASE_SIZE |
---|
273 | assert DATA_OFFSET == 468, DATA_OFFSET |
---|
274 | |
---|
275 | ! MAGIC = MUTABLE_MAGIC |
---|
276 | assert len(MAGIC) == 32 |
---|
277 | MAX_SIZE = 2*1000*1000*1000 # 2GB, kind of arbitrary |
---|
278 | # TODO: decide upon a policy for max share size |
---|
279 | |
---|
280 | ! def __init__(self, home, storageindex, shnum, parent=None): |
---|
281 | ! """ |
---|
282 | ! Clients should use the load_mutable_s3_share and create_mutable_s3_share |
---|
283 | ! factory functions rather than creating instances directly. |
---|
284 | ! """ |
---|
285 | self._storageindex = storageindex |
---|
286 | self._shnum = shnum |
---|
287 | self._home = home |
---|
288 | if self._home.exists(): |
---|
289 | *************** |
---|
290 | *** 79,91 **** |
---|
291 | --- 83,100 ---- |
---|
292 | raise UnknownMutableContainerVersionError(msg) |
---|
293 | finally: |
---|
294 | f.close() |
---|
295 | self.parent = parent # for logging |
---|
296 | + self._loaded = False |
---|
297 | |
---|
298 | def log(self, *args, **kwargs): |
---|
299 | if self.parent: |
---|
300 | return self.parent.log(*args, **kwargs) |
---|
301 | |
---|
302 | + def load(self): |
---|
303 | + self._loaded = True |
---|
304 | + return defer.succeed(self) |
---|
305 | + |
---|
306 | def create(self, serverid, write_enabler): |
---|
307 | assert not self._home.exists() |
---|
308 | data_length = 0 |
---|
309 | extra_lease_offset = (self.HEADER_SIZE |
---|
310 | *************** |
---|
311 | *** 105,118 **** |
---|
312 | --- 114,130 ---- |
---|
313 | f.write(struct.pack(">L", num_extra_leases)) |
---|
314 | # extra leases go here, none at creation |
---|
315 | finally: |
---|
316 | f.close() |
---|
317 | + self._loaded = True |
---|
318 | + return defer.succeed(self) |
---|
319 | |
---|
320 | def __repr__(self): |
---|
321 | return ("<MutableS3Share %s:%r at %s>" |
---|
322 | % (si_b2a(self._storageindex), self._shnum, quote_filepath(self._home))) |
---|
323 | |
---|
324 | def get_used_space(self): |
---|
325 | + assert self._loaded |
---|
326 | return fileutil.get_used_space(self._home) |
---|
327 | |
---|
328 | def get_storage_index(self): |
---|
329 | return self._storageindex |
---|
330 | *************** |
---|
331 | *** 124,131 **** |
---|
332 | --- 136,144 ---- |
---|
333 | return self._shnum |
---|
334 | |
---|
335 | def unlink(self): |
---|
336 | self._home.remove() |
---|
337 | + return defer.succeed(None) |
---|
338 | |
---|
339 | def _read_data_length(self, f): |
---|
340 | f.seek(self.DATA_LENGTH_OFFSET) |
---|
341 | (data_length,) = struct.unpack(">Q", f.read(8)) |
---|
342 | *************** |
---|
343 | *** 325,338 **** |
---|
344 | for (offset, length) in readv: |
---|
345 | datav.append(self._read_share_data(f, offset, length)) |
---|
346 | finally: |
---|
347 | f.close() |
---|
348 | ! return datav |
---|
349 | |
---|
350 | def get_size(self): |
---|
351 | return self._home.getsize() |
---|
352 | |
---|
353 | def get_data_length(self): |
---|
354 | f = self._home.open('rb') |
---|
355 | try: |
---|
356 | data_length = self._read_data_length(f) |
---|
357 | finally: |
---|
358 | --- 338,353 ---- |
---|
359 | for (offset, length) in readv: |
---|
360 | datav.append(self._read_share_data(f, offset, length)) |
---|
361 | finally: |
---|
362 | f.close() |
---|
363 | ! return defer.succeed(datav) |
---|
364 | |
---|
365 | def get_size(self): |
---|
366 | + assert self._loaded |
---|
367 | return self._home.getsize() |
---|
368 | |
---|
369 | def get_data_length(self): |
---|
370 | + assert self._loaded |
---|
371 | f = self._home.open('rb') |
---|
372 | try: |
---|
373 | data_length = self._read_data_length(f) |
---|
374 | finally: |
---|
375 | *************** |
---|
376 | *** 357,364 **** |
---|
377 | --- 372,380 ---- |
---|
378 | nodeid=idlib.nodeid_b2a(write_enabler_nodeid)) |
---|
379 | msg = "The write enabler was recorded by nodeid '%s'." % \ |
---|
380 | (idlib.nodeid_b2a(write_enabler_nodeid),) |
---|
381 | raise BadWriteEnablerError(msg) |
---|
382 | + return defer.succeed(None) |
---|
383 | |
---|
384 | def check_testv(self, testv): |
---|
385 | test_good = True |
---|
386 | f = self._home.open('rb+') |
---|
387 | *************** |
---|
388 | *** 369,377 **** |
---|
389 | test_good = False |
---|
390 | break |
---|
391 | finally: |
---|
392 | f.close() |
---|
393 | ! return test_good |
---|
394 | |
---|
395 | def writev(self, datav, new_length): |
---|
396 | f = self._home.open('rb+') |
---|
397 | try: |
---|
398 | --- 385,393 ---- |
---|
399 | test_good = False |
---|
400 | break |
---|
401 | finally: |
---|
402 | f.close() |
---|
403 | ! return defer.succeed(test_good) |
---|
404 | |
---|
405 | def writev(self, datav, new_length): |
---|
406 | f = self._home.open('rb+') |
---|
407 | try: |
---|
408 | *************** |
---|
409 | *** 385,398 **** |
---|
410 | # share data has shrunk, then call |
---|
411 | # self._change_container_size() here. |
---|
412 | finally: |
---|
413 | f.close() |
---|
414 | |
---|
415 | def close(self): |
---|
416 | ! pass |
---|
417 | |
---|
418 | |
---|
419 | ! def create_mutable_s3_share(storageindex, shnum, fp, serverid, write_enabler, parent): |
---|
420 | ! ms = MutableS3Share(storageindex, shnum, fp, parent) |
---|
421 | ! ms.create(serverid, write_enabler) |
---|
422 | ! del ms |
---|
423 | ! return MutableS3Share(storageindex, shnum, fp, parent) |
---|
424 | --- 401,415 ---- |
---|
425 | # share data has shrunk, then call |
---|
426 | # self._change_container_size() here. |
---|
427 | finally: |
---|
428 | f.close() |
---|
429 | + return defer.succeed(None) |
---|
430 | |
---|
431 | def close(self): |
---|
432 | ! return defer.succeed(None) |
---|
433 | ! |
---|
434 | |
---|
435 | + def load_mutable_s3_share(home, storageindex=None, shnum=None, parent=None): |
---|
436 | + return MutableS3Share(home, storageindex, shnum, parent).load() |
---|
437 | |
---|
438 | ! def create_mutable_s3_share(home, serverid, write_enabler, storageindex=None, shnum=None, parent=None): |
---|
439 | ! return MutableS3Share(home, storageindex, shnum, parent).create(serverid, write_enabler) |
---|
440 | diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/s3_backend.py ticket999-v16/src/allmydata/storage/backends/s3/s3_backend.py |
---|
441 | *** ticket999-v13a/src/allmydata/storage/backends/s3/s3_backend.py 2011-09-28 00:34:12.806614999 +0100 |
---|
442 | --- ticket999-v16/src/allmydata/storage/backends/s3/s3_backend.py 2011-09-29 05:13:52.626615006 +0100 |
---|
443 | *************** |
---|
444 | *** 1,63 **** |
---|
445 | |
---|
446 | ! import re |
---|
447 | ! |
---|
448 | ! from zope.interface import implements, Interface |
---|
449 | from allmydata.interfaces import IStorageBackend, IShareSet |
---|
450 | |
---|
451 | from allmydata.storage.common import si_a2b |
---|
452 | from allmydata.storage.bucket import BucketWriter |
---|
453 | from allmydata.storage.backends.base import Backend, ShareSet |
---|
454 | ! from allmydata.storage.backends.s3.immutable import ImmutableS3Share |
---|
455 | ! from allmydata.storage.backends.s3.mutable import MutableS3Share |
---|
456 | ! |
---|
457 | ! # The S3 bucket has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM . |
---|
458 | ! |
---|
459 | ! NUM_RE=re.compile("^[0-9]+$") |
---|
460 | ! |
---|
461 | ! |
---|
462 | ! class IS3Bucket(Interface): |
---|
463 | ! """ |
---|
464 | ! I represent an S3 bucket. |
---|
465 | ! """ |
---|
466 | ! def create(self): |
---|
467 | ! """ |
---|
468 | ! Create this bucket. |
---|
469 | ! """ |
---|
470 | ! |
---|
471 | ! def delete(self): |
---|
472 | ! """ |
---|
473 | ! Delete this bucket. |
---|
474 | ! The bucket must be empty before it can be deleted. |
---|
475 | ! """ |
---|
476 | ! |
---|
477 | ! def list_objects(self, prefix=""): |
---|
478 | ! """ |
---|
479 | ! Get a list of all the objects in this bucket whose object names start with |
---|
480 | ! the given prefix. |
---|
481 | ! """ |
---|
482 | ! |
---|
483 | ! def put_object(self, object_name, data, content_type=None, metadata={}): |
---|
484 | ! """ |
---|
485 | ! Put an object in this bucket. |
---|
486 | ! Any existing object of the same name will be replaced. |
---|
487 | ! """ |
---|
488 | ! |
---|
489 | ! def get_object(self, object_name): |
---|
490 | ! """ |
---|
491 | ! Get an object from this bucket. |
---|
492 | ! """ |
---|
493 | ! |
---|
494 | ! def head_object(self, object_name): |
---|
495 | ! """ |
---|
496 | ! Retrieve object metadata only. |
---|
497 | ! """ |
---|
498 | ! |
---|
499 | ! def delete_object(self, object_name): |
---|
500 | ! """ |
---|
501 | ! Delete an object from this bucket. |
---|
502 | ! Once deleted, there is no method to restore or undelete an object. |
---|
503 | ! """ |
---|
504 | |
---|
505 | |
---|
506 | class S3Backend(Backend): |
---|
507 | implements(IStorageBackend) |
---|
508 | --- 1,16 ---- |
---|
509 | |
---|
510 | ! from zope.interface import implements |
---|
511 | from allmydata.interfaces import IStorageBackend, IShareSet |
---|
512 | |
---|
513 | + from allmydata.util.deferredutil import gatherResults |
---|
514 | from allmydata.storage.common import si_a2b |
---|
515 | from allmydata.storage.bucket import BucketWriter |
---|
516 | from allmydata.storage.backends.base import Backend, ShareSet |
---|
517 | ! from allmydata.storage.backends.s3.immutable import load_immutable_s3_share, create_immutable_s3_share |
---|
518 | ! from allmydata.storage.backends.s3.mutable import load_mutable_s3_share, create_mutable_s3_share |
---|
519 | ! from allmydata.storage.backends.s3.s3_common import get_s3_share_key, NUM_RE |
---|
520 | ! from allmydata.mutable.layout import MUTABLE_MAGIC |
---|
521 | |
---|
522 | |
---|
523 | class S3Backend(Backend): |
---|
524 | implements(IStorageBackend) |
---|
525 | *************** |
---|
526 | *** 74,83 **** |
---|
527 | # we don't actually create the corruption-advisory dir until necessary |
---|
528 | self._corruption_advisory_dir = corruption_advisory_dir |
---|
529 | |
---|
530 | def get_sharesets_for_prefix(self, prefix): |
---|
531 | ! # TODO: query S3 for keys matching prefix |
---|
532 | ! return [] |
---|
533 | |
---|
534 | def get_shareset(self, storageindex): |
---|
535 | return S3ShareSet(storageindex, self._s3bucket) |
---|
536 | |
---|
537 | --- 27,53 ---- |
---|
538 | # we don't actually create the corruption-advisory dir until necessary |
---|
539 | self._corruption_advisory_dir = corruption_advisory_dir |
---|
540 | |
---|
541 | def get_sharesets_for_prefix(self, prefix): |
---|
542 | ! # XXX crawler.py needs to be changed to handle a Deferred return from this method. |
---|
543 | ! |
---|
544 | ! d = self._s3bucket.list_objects('shares/%s/' % (prefix,), '/') |
---|
545 | ! def _get_sharesets(res): |
---|
546 | ! # XXX this enumerates all shares to get the set of SIs. |
---|
547 | ! # Is there a way to enumerate SIs more efficiently? |
---|
548 | ! si_strings = set() |
---|
549 | ! for item in res.contents: |
---|
550 | ! # XXX better error handling |
---|
551 | ! path = item.key.split('/') |
---|
552 | ! assert path[0:2] == ["shares", prefix] |
---|
553 | ! si_strings.add(path[2]) |
---|
554 | ! |
---|
555 | ! # XXX we want this to be deterministic, so we return the sharesets sorted |
---|
556 | ! # by their si_strings, but we shouldn't need to explicitly re-sort them |
---|
557 | ! # because list_objects returns a sorted list. |
---|
558 | ! return [S3ShareSet(si_a2b(s), self._s3bucket) for s in sorted(si_strings)] |
---|
559 | ! d.addCallback(_get_sharesets) |
---|
560 | ! return d |
---|
561 | |
---|
562 | def get_shareset(self, storageindex): |
---|
563 | return S3ShareSet(storageindex, self._s3bucket) |
---|
564 | |
---|
565 | *************** |
---|
566 | *** 99,108 **** |
---|
567 | |
---|
568 | def __init__(self, storageindex, s3bucket): |
---|
569 | ShareSet.__init__(self, storageindex) |
---|
570 | self._s3bucket = s3bucket |
---|
571 | ! sistr = self.get_storage_index_string() |
---|
572 | ! self._key = 'shares/%s/%s/' % (sistr[:2], sistr) |
---|
573 | |
---|
574 | def get_overhead(self): |
---|
575 | return 0 |
---|
576 | |
---|
577 | --- 69,77 ---- |
---|
578 | |
---|
579 | def __init__(self, storageindex, s3bucket): |
---|
580 | ShareSet.__init__(self, storageindex) |
---|
581 | self._s3bucket = s3bucket |
---|
582 | ! self._key = get_s3_share_key(storageindex) |
---|
583 | |
---|
584 | def get_overhead(self): |
---|
585 | return 0 |
---|
586 | |
---|
587 | *************** |
---|
588 | *** 110,134 **** |
---|
589 | """ |
---|
590 | Generate IStorageBackendShare objects for shares we have for this storage index. |
---|
591 | ("Shares we have" means completed ones, excluding incoming ones.) |
---|
592 | """ |
---|
593 | ! pass |
---|
594 | |
---|
595 | def has_incoming(self, shnum): |
---|
596 | # TODO: this might need to be more like the disk backend; review callers |
---|
597 | return False |
---|
598 | |
---|
599 | def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): |
---|
600 | ! immsh = ImmutableS3Share(self.get_storage_index(), shnum, self._s3bucket, |
---|
601 | ! max_size=max_space_per_bucket) |
---|
602 | ! bw = BucketWriter(storageserver, immsh, lease_info, canary) |
---|
603 | ! return bw |
---|
604 | |
---|
605 | def _create_mutable_share(self, storageserver, shnum, write_enabler): |
---|
606 | - # TODO |
---|
607 | serverid = storageserver.get_serverid() |
---|
608 | ! return MutableS3Share(self.get_storage_index(), shnum, self._s3bucket, serverid, |
---|
609 | ! write_enabler, storageserver) |
---|
610 | |
---|
611 | def _clean_up_after_unlink(self): |
---|
612 | pass |
---|
613 | |
---|
614 | --- 79,130 ---- |
---|
615 | """ |
---|
616 | Generate IStorageBackendShare objects for shares we have for this storage index. |
---|
617 | ("Shares we have" means completed ones, excluding incoming ones.) |
---|
618 | """ |
---|
619 | ! d = self._s3bucket.list_objects(self._key, '/') |
---|
620 | ! def _get_shares(res): |
---|
621 | ! # XXX this enumerates all shares to get the set of SIs. |
---|
622 | ! # Is there a way to enumerate SIs more efficiently? |
---|
623 | ! shnums = [] |
---|
624 | ! for item in res.contents: |
---|
625 | ! assert item.key.startswith(self._key), item.key |
---|
626 | ! path = item.key.split('/') |
---|
627 | ! if len(path) == 4: |
---|
628 | ! shnumstr = path[3] |
---|
629 | ! if NUM_RE.match(shnumstr): |
---|
630 | ! shnums.add(int(shnumstr)) |
---|
631 | ! |
---|
632 | ! return gatherResults([self._load_share(shnum) for shnum in sorted(shnums)]) |
---|
633 | ! d.addCallback(_get_shares) |
---|
634 | ! return d |
---|
635 | ! |
---|
636 | ! def _load_share(self, shnum): |
---|
637 | ! d = self._s3bucket.get_object(self._key + str(shnum)) |
---|
638 | ! def _make_share(data): |
---|
639 | ! if data.startswith(MUTABLE_MAGIC): |
---|
640 | ! return load_mutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) |
---|
641 | ! else: |
---|
642 | ! # assume it's immutable |
---|
643 | ! return load_immutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) |
---|
644 | ! d.addCallback(_make_share) |
---|
645 | ! return d |
---|
646 | |
---|
647 | def has_incoming(self, shnum): |
---|
648 | # TODO: this might need to be more like the disk backend; review callers |
---|
649 | return False |
---|
650 | |
---|
651 | def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): |
---|
652 | ! d = create_immutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, |
---|
653 | ! max_size=max_space_per_bucket) |
---|
654 | ! def _created(immsh): |
---|
655 | ! return BucketWriter(storageserver, immsh, lease_info, canary) |
---|
656 | ! d.addCallback(_created) |
---|
657 | ! return d |
---|
658 | |
---|
659 | def _create_mutable_share(self, storageserver, shnum, write_enabler): |
---|
660 | serverid = storageserver.get_serverid() |
---|
661 | ! return create_mutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, serverid, |
---|
662 | ! write_enabler, storageserver) |
---|
663 | |
---|
664 | def _clean_up_after_unlink(self): |
---|
665 | pass |
---|
666 | |
---|
667 | diff '--context=4' --new-file ticket999-v13a/src/allmydata/storage/backends/s3/s3_common.py ticket999-v16/src/allmydata/storage/backends/s3/s3_common.py |
---|
668 | *** ticket999-v13a/src/allmydata/storage/backends/s3/s3_common.py 1970-01-01 01:00:00.000000000 +0100 |
---|
669 | --- ticket999-v16/src/allmydata/storage/backends/s3/s3_common.py 2011-09-29 05:13:51.666614998 +0100 |
---|
670 | *************** |
---|
671 | *** 0 **** |
---|
672 | --- 1,62 ---- |
---|
673 | + |
---|
674 | + import re |
---|
675 | + |
---|
676 | + from zope.interface import Interface |
---|
677 | + |
---|
678 | + from allmydata.storage.common import si_b2a |
---|
679 | + |
---|
680 | + |
---|
681 | + # The S3 bucket has keys of the form shares/$PREFIX/$STORAGEINDEX/$SHNUM . |
---|
682 | + |
---|
683 | + def get_s3_share_key(si, shnum=None): |
---|
684 | + sistr = si_b2a(si) |
---|
685 | + if shnum is None: |
---|
686 | + return "shares/%s/%s/" % (sistr[:2], sistr) |
---|
687 | + else: |
---|
688 | + return "shares/%s/%s/%d" % (sistr[:2], sistr, shnum) |
---|
689 | + |
---|
690 | + NUM_RE=re.compile("^[0-9]+$") |
---|
691 | + |
---|
692 | + |
---|
693 | + class IS3Bucket(Interface): |
---|
694 | + """ |
---|
695 | + I represent an S3 bucket. |
---|
696 | + """ |
---|
697 | + def create(self): |
---|
698 | + """ |
---|
699 | + Create this bucket. |
---|
700 | + """ |
---|
701 | + |
---|
702 | + def delete(self): |
---|
703 | + """ |
---|
704 | + Delete this bucket. |
---|
705 | + The bucket must be empty before it can be deleted. |
---|
706 | + """ |
---|
707 | + |
---|
708 | + def list_objects(self, prefix=""): |
---|
709 | + """ |
---|
710 | + Get a list of all the objects in this bucket whose object names start with |
---|
711 | + the given prefix. |
---|
712 | + """ |
---|
713 | + |
---|
714 | + def put_object(self, object_name, data, content_type=None, metadata={}): |
---|
715 | + """ |
---|
716 | + Put an object in this bucket. |
---|
717 | + Any existing object of the same name will be replaced. |
---|
718 | + """ |
---|
719 | + |
---|
720 | + def get_object(self, object_name): |
---|
721 | + """ |
---|
722 | + Get an object from this bucket. |
---|
723 | + """ |
---|
724 | + |
---|
725 | + def head_object(self, object_name): |
---|
726 | + """ |
---|
727 | + Retrieve object metadata only. |
---|
728 | + """ |
---|
729 | + |
---|
730 | + def delete_object(self, object_name): |
---|
731 | + """ |
---|
732 | + Delete an object from this bucket. |
---|
733 | + Once deleted, there is no method to restore or undelete an object. |
---|
734 | + """ |
---|