source: trunk/src/allmydata/storage/server.py @ 32f8062

Last change on this file since 32f8062 was 32f8062, checked in by Zooko O'Whielacronx <zooko@…>, at 2011-09-12T22:26:55Z

storage: more paranoid handling of bounds and palimpsests in mutable share files

  • storage server ignores requests to extend shares by sending a new_length
  • storage server fills exposed holes (created by sending a write vector whose offset begins after the end of the current data) with 0 to avoid "palimpsest" exposure of previous contents
  • storage server zeroes out lease info at the old location when moving it to a new location

ref. #1528

  • Property mode set to 100644
File size: 23.1 KB
Line 
1import os, re, weakref, struct, time
2
3from foolscap.api import Referenceable
4from twisted.application import service
5
6from zope.interface import implements
7from allmydata.interfaces import RIStorageServer, IStatsProducer
8from allmydata.util import fileutil, idlib, log, time_format
9import allmydata # for __full_version__
10
11from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
12_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
13from allmydata.storage.lease import LeaseInfo
14from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
15     create_mutable_sharefile
16from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
17from allmydata.storage.crawler import BucketCountingCrawler
18from allmydata.storage.expirer import LeaseCheckingCrawler
19
20# storage/
21# storage/shares/incoming
22#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
23#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
24# storage/shares/$START/$STORAGEINDEX
25# storage/shares/$START/$STORAGEINDEX/$SHARENUM
26
27# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
28# base-32 chars).
29
30# $SHARENUM matches this regex:
31NUM_RE=re.compile("^[0-9]+$")
32
33
34
35class StorageServer(service.MultiService, Referenceable):
36    implements(RIStorageServer, IStatsProducer)
37    name = 'storage'
38    LeaseCheckerClass = LeaseCheckingCrawler
39
40    def __init__(self, storedir, nodeid, reserved_space=0,
41                 discard_storage=False, readonly_storage=False,
42                 stats_provider=None,
43                 expiration_enabled=False,
44                 expiration_mode="age",
45                 expiration_override_lease_duration=None,
46                 expiration_cutoff_date=None,
47                 expiration_sharetypes=("mutable", "immutable")):
48        service.MultiService.__init__(self)
49        assert isinstance(nodeid, str)
50        assert len(nodeid) == 20
51        self.my_nodeid = nodeid
52        self.storedir = storedir
53        sharedir = os.path.join(storedir, "shares")
54        fileutil.make_dirs(sharedir)
55        self.sharedir = sharedir
56        # we don't actually create the corruption-advisory dir until necessary
57        self.corruption_advisory_dir = os.path.join(storedir,
58                                                    "corruption-advisories")
59        self.reserved_space = int(reserved_space)
60        self.no_storage = discard_storage
61        self.readonly_storage = readonly_storage
62        self.stats_provider = stats_provider
63        if self.stats_provider:
64            self.stats_provider.register_producer(self)
65        self.incomingdir = os.path.join(sharedir, 'incoming')
66        self._clean_incomplete()
67        fileutil.make_dirs(self.incomingdir)
68        self._active_writers = weakref.WeakKeyDictionary()
69        log.msg("StorageServer created", facility="tahoe.storage")
70
71        if reserved_space:
72            if self.get_available_space() is None:
73                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
74                        umin="0wZ27w", level=log.UNUSUAL)
75
76        self.latencies = {"allocate": [], # immutable
77                          "write": [],
78                          "close": [],
79                          "read": [],
80                          "get": [],
81                          "writev": [], # mutable
82                          "readv": [],
83                          "add-lease": [], # both
84                          "renew": [],
85                          "cancel": [],
86                          }
87        self.add_bucket_counter()
88
89        statefile = os.path.join(self.storedir, "lease_checker.state")
90        historyfile = os.path.join(self.storedir, "lease_checker.history")
91        klass = self.LeaseCheckerClass
92        self.lease_checker = klass(self, statefile, historyfile,
93                                   expiration_enabled, expiration_mode,
94                                   expiration_override_lease_duration,
95                                   expiration_cutoff_date,
96                                   expiration_sharetypes)
97        self.lease_checker.setServiceParent(self)
98
99    def __repr__(self):
100        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
101
102    def add_bucket_counter(self):
103        statefile = os.path.join(self.storedir, "bucket_counter.state")
104        self.bucket_counter = BucketCountingCrawler(self, statefile)
105        self.bucket_counter.setServiceParent(self)
106
107    def count(self, name, delta=1):
108        if self.stats_provider:
109            self.stats_provider.count("storage_server." + name, delta)
110
111    def add_latency(self, category, latency):
112        a = self.latencies[category]
113        a.append(latency)
114        if len(a) > 1000:
115            self.latencies[category] = a[-1000:]
116
117    def get_latencies(self):
118        """Return a dict, indexed by category, that contains a dict of
119        latency numbers for each category. If there are sufficient samples
120        for unambiguous interpretation, each dict will contain the
121        following keys: mean, 01_0_percentile, 10_0_percentile,
122        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
123        99_0_percentile, 99_9_percentile.  If there are insufficient
124        samples for a given percentile to be interpreted unambiguously
125        that percentile will be reported as None. If no samples have been
126        collected for the given category, then that category name will
127        not be present in the return value. """
128        # note that Amazon's Dynamo paper says they use 99.9% percentile.
129        output = {}
130        for category in self.latencies:
131            if not self.latencies[category]:
132                continue
133            stats = {}
134            samples = self.latencies[category][:]
135            count = len(samples)
136            stats["samplesize"] = count
137            samples.sort()
138            if count > 1:
139                stats["mean"] = sum(samples) / count
140            else:
141                stats["mean"] = None
142
143            orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
144                             (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
145                             (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
146                             (0.999, "99_9_percentile", 1000)]
147
148            for percentile, percentilestring, minnumtoobserve in orderstatlist:
149                if count >= minnumtoobserve:
150                    stats[percentilestring] = samples[int(percentile*count)]
151                else:
152                    stats[percentilestring] = None
153
154            output[category] = stats
155        return output
156
157    def log(self, *args, **kwargs):
158        if "facility" not in kwargs:
159            kwargs["facility"] = "tahoe.storage"
160        return log.msg(*args, **kwargs)
161
162    def _clean_incomplete(self):
163        fileutil.rm_dir(self.incomingdir)
164
165    def get_stats(self):
166        # remember: RIStatsProvider requires that our return dict
167        # contains numeric values.
168        stats = { 'storage_server.allocated': self.allocated_size(), }
169        stats['storage_server.reserved_space'] = self.reserved_space
170        for category,ld in self.get_latencies().items():
171            for name,v in ld.items():
172                stats['storage_server.latencies.%s.%s' % (category, name)] = v
173
174        try:
175            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
176            writeable = disk['avail'] > 0
177
178            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
179            stats['storage_server.disk_total'] = disk['total']
180            stats['storage_server.disk_used'] = disk['used']
181            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
182            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
183            stats['storage_server.disk_avail'] = disk['avail']
184        except AttributeError:
185            writeable = True
186        except EnvironmentError:
187            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
188            writeable = False
189
190        if self.readonly_storage:
191            stats['storage_server.disk_avail'] = 0
192            writeable = False
193
194        stats['storage_server.accepting_immutable_shares'] = int(writeable)
195        s = self.bucket_counter.get_state()
196        bucket_count = s.get("last-complete-bucket-count")
197        if bucket_count:
198            stats['storage_server.total_bucket_count'] = bucket_count
199        return stats
200
201    def get_available_space(self):
202        """Returns available space for share storage in bytes, or None if no
203        API to get this information is available."""
204
205        if self.readonly_storage:
206            return 0
207        return fileutil.get_available_space(self.sharedir, self.reserved_space)
208
209    def allocated_size(self):
210        space = 0
211        for bw in self._active_writers:
212            space += bw.allocated_size()
213        return space
214
215    def remote_get_version(self):
216        remaining_space = self.get_available_space()
217        if remaining_space is None:
218            # We're on a platform that has no API to get disk stats.
219            remaining_space = 2**64
220
221        version = { "http://allmydata.org/tahoe/protocols/storage/v1" :
222                    { "maximum-immutable-share-size": remaining_space,
223                      "tolerates-immutable-read-overrun": True,
224                      "delete-mutable-shares-with-zero-length-writev": True,
225                      "fills-holes-with-zero-bytes": True,
226                      "prevents-read-past-end-of-share-data": True,
227                      },
228                    "application-version": str(allmydata.__full_version__),
229                    }
230        return version
231
232    def remote_allocate_buckets(self, storage_index,
233                                renew_secret, cancel_secret,
234                                sharenums, allocated_size,
235                                canary, owner_num=0):
236        # owner_num is not for clients to set, but rather it should be
237        # curried into the PersonalStorageServer instance that is dedicated
238        # to a particular owner.
239        start = time.time()
240        self.count("allocate")
241        alreadygot = set()
242        bucketwriters = {} # k: shnum, v: BucketWriter
243        si_dir = storage_index_to_dir(storage_index)
244        si_s = si_b2a(storage_index)
245
246        log.msg("storage: allocate_buckets %s" % si_s)
247
248        # in this implementation, the lease information (including secrets)
249        # goes into the share files themselves. It could also be put into a
250        # separate database. Note that the lease should not be added until
251        # the BucketWriter has been closed.
252        expire_time = time.time() + 31*24*60*60
253        lease_info = LeaseInfo(owner_num,
254                               renew_secret, cancel_secret,
255                               expire_time, self.my_nodeid)
256
257        max_space_per_bucket = allocated_size
258
259        remaining_space = self.get_available_space()
260        limited = remaining_space is not None
261        if limited:
262            # this is a bit conservative, since some of this allocated_size()
263            # has already been written to disk, where it will show up in
264            # get_available_space.
265            remaining_space -= self.allocated_size()
266        # self.readonly_storage causes remaining_space <= 0
267
268        # fill alreadygot with all shares that we have, not just the ones
269        # they asked about: this will save them a lot of work. Add or update
270        # leases for all of them: if they want us to hold shares for this
271        # file, they'll want us to hold leases for this file.
272        for (shnum, fn) in self._get_bucket_shares(storage_index):
273            alreadygot.add(shnum)
274            sf = ShareFile(fn)
275            sf.add_or_renew_lease(lease_info)
276
277        for shnum in sharenums:
278            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
279            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
280            if os.path.exists(finalhome):
281                # great! we already have it. easy.
282                pass
283            elif os.path.exists(incominghome):
284                # Note that we don't create BucketWriters for shnums that
285                # have a partial share (in incoming/), so if a second upload
286                # occurs while the first is still in progress, the second
287                # uploader will use different storage servers.
288                pass
289            elif (not limited) or (remaining_space >= max_space_per_bucket):
290                # ok! we need to create the new share file.
291                bw = BucketWriter(self, incominghome, finalhome,
292                                  max_space_per_bucket, lease_info, canary)
293                if self.no_storage:
294                    bw.throw_out_all_data = True
295                bucketwriters[shnum] = bw
296                self._active_writers[bw] = 1
297                if limited:
298                    remaining_space -= max_space_per_bucket
299            else:
300                # bummer! not enough space to accept this bucket
301                pass
302
303        if bucketwriters:
304            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
305
306        self.add_latency("allocate", time.time() - start)
307        return alreadygot, bucketwriters
308
309    def _iter_share_files(self, storage_index):
310        for shnum, filename in self._get_bucket_shares(storage_index):
311            f = open(filename, 'rb')
312            header = f.read(32)
313            f.close()
314            if header[:32] == MutableShareFile.MAGIC:
315                sf = MutableShareFile(filename, self)
316                # note: if the share has been migrated, the renew_lease()
317                # call will throw an exception, with information to help the
318                # client update the lease.
319            elif header[:4] == struct.pack(">L", 1):
320                sf = ShareFile(filename)
321            else:
322                continue # non-sharefile
323            yield sf
324
325    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
326                         owner_num=1):
327        start = time.time()
328        self.count("add-lease")
329        new_expire_time = time.time() + 31*24*60*60
330        lease_info = LeaseInfo(owner_num,
331                               renew_secret, cancel_secret,
332                               new_expire_time, self.my_nodeid)
333        for sf in self._iter_share_files(storage_index):
334            sf.add_or_renew_lease(lease_info)
335        self.add_latency("add-lease", time.time() - start)
336        return None
337
338    def remote_renew_lease(self, storage_index, renew_secret):
339        start = time.time()
340        self.count("renew")
341        new_expire_time = time.time() + 31*24*60*60
342        found_buckets = False
343        for sf in self._iter_share_files(storage_index):
344            found_buckets = True
345            sf.renew_lease(renew_secret, new_expire_time)
346        self.add_latency("renew", time.time() - start)
347        if not found_buckets:
348            raise IndexError("no such lease to renew")
349
350    def bucket_writer_closed(self, bw, consumed_size):
351        if self.stats_provider:
352            self.stats_provider.count('storage_server.bytes_added', consumed_size)
353        del self._active_writers[bw]
354
355    def _get_bucket_shares(self, storage_index):
356        """Return a list of (shnum, pathname) tuples for files that hold
357        shares for this storage_index. In each tuple, 'shnum' will always be
358        the integer form of the last component of 'pathname'."""
359        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
360        try:
361            for f in os.listdir(storagedir):
362                if NUM_RE.match(f):
363                    filename = os.path.join(storagedir, f)
364                    yield (int(f), filename)
365        except OSError:
366            # Commonly caused by there being no buckets at all.
367            pass
368
369    def remote_get_buckets(self, storage_index):
370        start = time.time()
371        self.count("get")
372        si_s = si_b2a(storage_index)
373        log.msg("storage: get_buckets %s" % si_s)
374        bucketreaders = {} # k: sharenum, v: BucketReader
375        for shnum, filename in self._get_bucket_shares(storage_index):
376            bucketreaders[shnum] = BucketReader(self, filename,
377                                                storage_index, shnum)
378        self.add_latency("get", time.time() - start)
379        return bucketreaders
380
381    def get_leases(self, storage_index):
382        """Provide an iterator that yields all of the leases attached to this
383        bucket. Each lease is returned as a LeaseInfo instance.
384
385        This method is not for client use.
386        """
387
388        # since all shares get the same lease data, we just grab the leases
389        # from the first share
390        try:
391            shnum, filename = self._get_bucket_shares(storage_index).next()
392            sf = ShareFile(filename)
393            return sf.get_leases()
394        except StopIteration:
395            return iter([])
396
397    def remote_slot_testv_and_readv_and_writev(self, storage_index,
398                                               secrets,
399                                               test_and_write_vectors,
400                                               read_vector):
401        start = time.time()
402        self.count("writev")
403        si_s = si_b2a(storage_index)
404        log.msg("storage: slot_writev %s" % si_s)
405        si_dir = storage_index_to_dir(storage_index)
406        (write_enabler, renew_secret, cancel_secret) = secrets
407        # shares exist if there is a file for them
408        bucketdir = os.path.join(self.sharedir, si_dir)
409        shares = {}
410        if os.path.isdir(bucketdir):
411            for sharenum_s in os.listdir(bucketdir):
412                try:
413                    sharenum = int(sharenum_s)
414                except ValueError:
415                    continue
416                filename = os.path.join(bucketdir, sharenum_s)
417                msf = MutableShareFile(filename, self)
418                msf.check_write_enabler(write_enabler, si_s)
419                shares[sharenum] = msf
420        # write_enabler is good for all existing shares.
421
422        # Now evaluate test vectors.
423        testv_is_good = True
424        for sharenum in test_and_write_vectors:
425            (testv, datav, new_length) = test_and_write_vectors[sharenum]
426            if sharenum in shares:
427                if not shares[sharenum].check_testv(testv):
428                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
429                    testv_is_good = False
430                    break
431            else:
432                # compare the vectors against an empty share, in which all
433                # reads return empty strings.
434                if not EmptyShare().check_testv(testv):
435                    self.log("testv failed (empty): [%d] %r" % (sharenum,
436                                                                testv))
437                    testv_is_good = False
438                    break
439
440        # now gather the read vectors, before we do any writes
441        read_data = {}
442        for sharenum, share in shares.items():
443            read_data[sharenum] = share.readv(read_vector)
444
445        ownerid = 1 # TODO
446        expire_time = time.time() + 31*24*60*60   # one month
447        lease_info = LeaseInfo(ownerid,
448                               renew_secret, cancel_secret,
449                               expire_time, self.my_nodeid)
450
451        if testv_is_good:
452            # now apply the write vectors
453            for sharenum in test_and_write_vectors:
454                (testv, datav, new_length) = test_and_write_vectors[sharenum]
455                if new_length == 0:
456                    if sharenum in shares:
457                        shares[sharenum].unlink()
458                else:
459                    if sharenum not in shares:
460                        # allocate a new share
461                        allocated_size = 2000 # arbitrary, really
462                        share = self._allocate_slot_share(bucketdir, secrets,
463                                                          sharenum,
464                                                          allocated_size,
465                                                          owner_num=0)
466                        shares[sharenum] = share
467                    shares[sharenum].writev(datav, new_length)
468                    # and update the lease
469                    shares[sharenum].add_or_renew_lease(lease_info)
470
471            if new_length == 0:
472                # delete empty bucket directories
473                if not os.listdir(bucketdir):
474                    os.rmdir(bucketdir)
475
476
477        # all done
478        self.add_latency("writev", time.time() - start)
479        return (testv_is_good, read_data)
480
481    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
482                             allocated_size, owner_num=0):
483        (write_enabler, renew_secret, cancel_secret) = secrets
484        my_nodeid = self.my_nodeid
485        fileutil.make_dirs(bucketdir)
486        filename = os.path.join(bucketdir, "%d" % sharenum)
487        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
488                                         self)
489        return share
490
491    def remote_slot_readv(self, storage_index, shares, readv):
492        start = time.time()
493        self.count("readv")
494        si_s = si_b2a(storage_index)
495        lp = log.msg("storage: slot_readv %s %s" % (si_s, shares),
496                     facility="tahoe.storage", level=log.OPERATIONAL)
497        si_dir = storage_index_to_dir(storage_index)
498        # shares exist if there is a file for them
499        bucketdir = os.path.join(self.sharedir, si_dir)
500        if not os.path.isdir(bucketdir):
501            self.add_latency("readv", time.time() - start)
502            return {}
503        datavs = {}
504        for sharenum_s in os.listdir(bucketdir):
505            try:
506                sharenum = int(sharenum_s)
507            except ValueError:
508                continue
509            if sharenum in shares or not shares:
510                filename = os.path.join(bucketdir, sharenum_s)
511                msf = MutableShareFile(filename, self)
512                datavs[sharenum] = msf.readv(readv)
513        log.msg("returning shares %s" % (datavs.keys(),),
514                facility="tahoe.storage", level=log.NOISY, parent=lp)
515        self.add_latency("readv", time.time() - start)
516        return datavs
517
518    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
519                                    reason):
520        fileutil.make_dirs(self.corruption_advisory_dir)
521        now = time_format.iso_utc(sep="T")
522        si_s = si_b2a(storage_index)
523        # windows can't handle colons in the filename
524        fn = os.path.join(self.corruption_advisory_dir,
525                          "%s--%s-%d" % (now, si_s, shnum)).replace(":","")
526        f = open(fn, "w")
527        f.write("report: Share Corruption\n")
528        f.write("type: %s\n" % share_type)
529        f.write("storage_index: %s\n" % si_s)
530        f.write("share_number: %d\n" % shnum)
531        f.write("\n")
532        f.write(reason)
533        f.write("\n")
534        f.close()
535        log.msg(format=("client claims corruption in (%(share_type)s) " +
536                        "%(si)s-%(shnum)d: %(reason)s"),
537                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
538                level=log.SCARY, umid="SGx2fA")
539        return None
Note: See TracBrowser for help on using the repository browser.