1 | [01;34mhunk[00m ./src/allmydata/interfaces.py 503 |
---|
2 | |
---|
3 | def get_used_space(): |
---|
4 | """ |
---|
5 | - Returns the amount of backend storage including overhead, in bytes, used |
---|
6 | - by this share. |
---|
7 | + Returns the amount of backend storage including overhead (which may |
---|
8 | + have to be estimated), in bytes, used by this share. |
---|
9 | """ |
---|
10 | |
---|
11 | def unlink(): |
---|
12 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 3 |
---|
13 | |
---|
14 | import struct |
---|
15 | +from cStringIO import StringIO |
---|
16 | |
---|
17 | from twisted.internet import defer |
---|
18 | |
---|
19 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 27 |
---|
20 | # data_length+0x0c: first lease. Each lease record is 72 bytes. |
---|
21 | |
---|
22 | |
---|
23 | -class ImmutableS3Share(object): |
---|
24 | - implements(IStoredShare) |
---|
25 | +class ImmutableS3ShareBase(object): |
---|
26 | + implements(IShareBase) # XXX |
---|
27 | |
---|
28 | sharetype = "immutable" |
---|
29 | LEASE_SIZE = struct.calcsize(">L32s32sL") # for compatibility |
---|
30 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 35 |
---|
31 | HEADER = ">LLL" |
---|
32 | HEADER_SIZE = struct.calcsize(HEADER) |
---|
33 | |
---|
34 | - def __init__(self, s3bucket, storageindex, shnum, max_size=None, data=None): |
---|
35 | - """ |
---|
36 | - If max_size is not None then I won't allow more than max_size to be written to me. |
---|
37 | - |
---|
38 | - Clients should use the load_immutable_s3_share and create_immutable_s3_share |
---|
39 | - factory functions rather than creating instances directly. |
---|
40 | - """ |
---|
41 | + def __init__(self, s3bucket, storageindex, shnum): |
---|
42 | self._s3bucket = s3bucket |
---|
43 | self._storageindex = storageindex |
---|
44 | self._shnum = shnum |
---|
45 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 39 |
---|
46 | - self._max_size = max_size |
---|
47 | - self._data = data |
---|
48 | self._key = get_s3_share_key(storageindex, shnum) |
---|
49 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 40 |
---|
50 | - self._data_offset = self.HEADER_SIZE |
---|
51 | - self._loaded = False |
---|
52 | |
---|
53 | def __repr__(self): |
---|
54 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 42 |
---|
55 | - return ("<ImmutableS3Share at %r>" % (self._key,)) |
---|
56 | - |
---|
57 | - def load(self): |
---|
58 | - if self._max_size is not None: # creating share |
---|
59 | - # The second field, which was the four-byte share data length in |
---|
60 | - # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. |
---|
61 | - # We also write 0 for the number of leases. |
---|
62 | - self._home.setContent(struct.pack(self.HEADER, 1, 0, 0) ) |
---|
63 | - self._end_offset = self.HEADER_SIZE + self._max_size |
---|
64 | - self._size = self.HEADER_SIZE |
---|
65 | - self._writes = [] |
---|
66 | - self._loaded = True |
---|
67 | - return defer.succeed(None) |
---|
68 | - |
---|
69 | - if self._data is None: |
---|
70 | - # If we don't already have the data, get it from S3. |
---|
71 | - d = self._s3bucket.get_object(self._key) |
---|
72 | - else: |
---|
73 | - d = defer.succeed(self._data) |
---|
74 | - |
---|
75 | - def _got_data(data): |
---|
76 | - self._data = data |
---|
77 | - header = self._data[:self.HEADER_SIZE] |
---|
78 | - (version, unused, num_leases) = struct.unpack(self.HEADER, header) |
---|
79 | - |
---|
80 | - if version != 1: |
---|
81 | - msg = "%r had version %d but we wanted 1" % (self, version) |
---|
82 | - raise UnknownImmutableContainerVersionError(msg) |
---|
83 | - |
---|
84 | - # We cannot write leases in share files, but allow them to be present |
---|
85 | - # in case a share file is copied from a disk backend, or in case we |
---|
86 | - # need them in future. |
---|
87 | - self._size = len(self._data) |
---|
88 | - self._end_offset = self._size - (num_leases * self.LEASE_SIZE) |
---|
89 | - self._loaded = True |
---|
90 | - d.addCallback(_got_data) |
---|
91 | - return d |
---|
92 | - |
---|
93 | - def close(self): |
---|
94 | - # This will briefly use memory equal to double the share size. |
---|
95 | - # We really want to stream writes to S3, but I don't think txaws supports that yet |
---|
96 | - # (and neither does IS3Bucket, since that's a thin wrapper over the txaws S3 API). |
---|
97 | - |
---|
98 | - self._data = "".join(self._writes) |
---|
99 | - del self._writes |
---|
100 | - self._s3bucket.put_object(self._key, self._data) |
---|
101 | - return defer.succeed(None) |
---|
102 | - |
---|
103 | - def get_used_space(self): |
---|
104 | - return self._size |
---|
105 | + return ("<%s at %r>" % (self.__class__.__name__, self._key,)) |
---|
106 | |
---|
107 | def get_storage_index(self): |
---|
108 | return self._storageindex |
---|
109 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 53 |
---|
110 | def get_shnum(self): |
---|
111 | return self._shnum |
---|
112 | |
---|
113 | - def unlink(self): |
---|
114 | - self._data = None |
---|
115 | - self._writes = None |
---|
116 | - return self._s3bucket.delete_object(self._key) |
---|
117 | +class ImmutableS3ShareForWriting(ImmutableS3ShareBase): |
---|
118 | + implements(IShareForWriting) # XXX |
---|
119 | + |
---|
120 | + def __init__(self, s3bucket, storageindex, shnum, max_size): |
---|
121 | + """ |
---|
122 | + I won't allow more than max_size to be written to me. |
---|
123 | + """ |
---|
124 | + precondition(isinstance(max_size, (int, long)), max_size) |
---|
125 | + ImmutableS3ShareBase.__init__(self, s3bucket, storageindex, shnum) |
---|
126 | + self._max_size = max_size |
---|
127 | + self._end_offset = self.HEADER_SIZE + self._max_size |
---|
128 | + |
---|
129 | + self._buf = StringIO() |
---|
130 | + # The second field, which was the four-byte share data length in |
---|
131 | + # Tahoe-LAFS versions prior to 1.3.0, is not used; we always write 0. |
---|
132 | + # We also write 0 for the number of leases. |
---|
133 | + self._buf.write(struct.pack(self.HEADER, 1, 0, 0) ) |
---|
134 | + |
---|
135 | + def close(self): |
---|
136 | + # We really want to stream writes to S3, but txaws doesn't support |
---|
137 | + # that yet (and neither does IS3Bucket, since that's a thin wrapper |
---|
138 | + # over the txaws S3 API). See |
---|
139 | + # https://bugs.launchpad.net/txaws/+bug/767205 and |
---|
140 | + # https://bugs.launchpad.net/txaws/+bug/783801 |
---|
141 | + return self._s3bucket.put_object(self._key, self._buf.getvalue()) |
---|
142 | |
---|
143 | def get_allocated_size(self): |
---|
144 | return self._max_size |
---|
145 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 82 |
---|
146 | |
---|
147 | - def get_size(self): |
---|
148 | - return self._size |
---|
149 | + def write_share_data(self, offset, data): |
---|
150 | + self._buf.seek(offset) |
---|
151 | + self._buf.write(data) |
---|
152 | + if self._buf.tell() > self._max_size: |
---|
153 | + raise DataTooLargeError(self._max_size, offset, len(data)) |
---|
154 | + return defer.succeed(None) |
---|
155 | + |
---|
156 | +class ImmutableS3ShareForReading(object): |
---|
157 | + implements(IStoredShareForReading) # XXX |
---|
158 | + |
---|
159 | + def __init__(self, s3bucket, storageindex, shnum, data): |
---|
160 | + ImmutableS3ShareBase.__init__(self, s3bucket, storageindex, shnum) |
---|
161 | + self._data = data |
---|
162 | + |
---|
163 | + header = self._data[:self.HEADER_SIZE] |
---|
164 | + (version, unused, num_leases) = struct.unpack(self.HEADER, header) |
---|
165 | |
---|
166 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 99 |
---|
167 | - def get_data_length(self): |
---|
168 | - return self._end_offset - self._data_offset |
---|
169 | + if version != 1: |
---|
170 | + msg = "%r had version %d but we wanted 1" % (self, version) |
---|
171 | + raise UnknownImmutableContainerVersionError(msg) |
---|
172 | + |
---|
173 | + # We cannot write leases in share files, but allow them to be present |
---|
174 | + # in case a share file is copied from a disk backend, or in case we |
---|
175 | + # need them in future. |
---|
176 | + self._end_offset = len(self._data) - (num_leases * self.LEASE_SIZE) |
---|
177 | |
---|
178 | def readv(self, readv): |
---|
179 | datav = [] |
---|
180 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 119 |
---|
181 | |
---|
182 | # Reads beyond the end of the data are truncated. Reads that start |
---|
183 | # beyond the end of the data return an empty string. |
---|
184 | - seekpos = self._data_offset+offset |
---|
185 | + seekpos = self.HEADER_SIZE+offset |
---|
186 | actuallength = max(0, min(length, self._end_offset-seekpos)) |
---|
187 | if actuallength == 0: |
---|
188 | return defer.succeed("") |
---|
189 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/immutable.py 124 |
---|
190 | return defer.succeed(self._data[offset:offset+actuallength]) |
---|
191 | - |
---|
192 | - def write_share_data(self, offset, data): |
---|
193 | - length = len(data) |
---|
194 | - precondition(offset >= self._size, "offset = %r, size = %r" % (offset, self._size)) |
---|
195 | - if self._max_size is not None and offset+length > self._max_size: |
---|
196 | - raise DataTooLargeError(self._max_size, offset, length) |
---|
197 | - |
---|
198 | - if offset > self._size: |
---|
199 | - self._writes.append("\x00" * (offset - self._size)) |
---|
200 | - self._writes.append(data) |
---|
201 | - self._size = offset + len(data) |
---|
202 | - return defer.succeed(None) |
---|
203 | - |
---|
204 | - def add_lease(self, lease_info): |
---|
205 | - pass |
---|
206 | - |
---|
207 | - |
---|
208 | -def load_immutable_s3_share(s3bucket, storageindex, shnum, data=None): |
---|
209 | - return ImmutableS3Share(s3bucket, storageindex, shnum, data=data).load() |
---|
210 | - |
---|
211 | -def create_immutable_s3_share(s3bucket, storageindex, shnum, max_size): |
---|
212 | - return ImmutableS3Share(s3bucket, storageindex, shnum, max_size=max_size).load() |
---|
213 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/s3_backend.py 9 |
---|
214 | from allmydata.storage.common import si_a2b |
---|
215 | from allmydata.storage.bucket import BucketWriter |
---|
216 | from allmydata.storage.backends.base import Backend, ShareSet |
---|
217 | -from allmydata.storage.backends.s3.immutable import load_immutable_s3_share, create_immutable_s3_share |
---|
218 | +from allmydata.storage.backends.s3.immutable import ImmutableS3ShareForReading, ImmutableS3ShareForWriting |
---|
219 | from allmydata.storage.backends.s3.mutable import load_mutable_s3_share, create_mutable_s3_share |
---|
220 | from allmydata.storage.backends.s3.s3_common import get_s3_share_key, NUM_RE |
---|
221 | from allmydata.mutable.layout import MUTABLE_MAGIC |
---|
222 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/s3_backend.py 107 |
---|
223 | return load_mutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) |
---|
224 | else: |
---|
225 | # assume it's immutable |
---|
226 | - return load_immutable_s3_share(self._s3bucket, self._storageindex, shnum, data=data) |
---|
227 | + return ImmutableS3ShareForReading(self._s3bucket, self._storageindex, shnum, data=data) |
---|
228 | d.addCallback(_make_share) |
---|
229 | return d |
---|
230 | |
---|
231 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/s3_backend.py 116 |
---|
232 | return False |
---|
233 | |
---|
234 | def make_bucket_writer(self, storageserver, shnum, max_space_per_bucket, lease_info, canary): |
---|
235 | - d = create_immutable_s3_share(self._s3bucket, self.get_storage_index(), shnum, |
---|
236 | + immsh = ImmutableS3ShareForWriting(self._s3bucket, self.get_storage_index(), shnum, |
---|
237 | max_size=max_space_per_bucket) |
---|
238 | [01;34mhunk[00m ./src/allmydata/storage/backends/s3/s3_backend.py 118 |
---|
239 | - def _created(immsh): |
---|
240 | - return BucketWriter(storageserver, immsh, lease_info, canary) |
---|
241 | - d.addCallback(_created) |
---|
242 | - return d |
---|
243 | + return defer.succeed(BucketWriter(storageserver, immsh, lease_info, canary)) |
---|
244 | |
---|
245 | def _create_mutable_share(self, storageserver, shnum, write_enabler): |
---|
246 | serverid = storageserver.get_serverid() |
---|