source: trunk/src/allmydata/storage/server.py

Last change on this file was 7042442, checked in by meejah <meejah@…>, at 2024-08-08T23:10:30Z

should always be bytes

  • Property mode set to 100644
File size: 37.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6from typing import Iterable, Any
7
8import os, re
9
10from foolscap.api import Referenceable
11from foolscap.ipb import IRemoteReference
12from twisted.application import service
13from twisted.internet import reactor
14
15from zope.interface import implementer
16from allmydata.interfaces import RIStorageServer, IStatsProducer
17from allmydata.util import fileutil, idlib, log, time_format
18import allmydata # for __full_version__
19
20from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
21_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
22from allmydata.storage.lease import LeaseInfo
23from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
24     create_mutable_sharefile
25from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
26from allmydata.storage.immutable import (
27    ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter,
28    FoolscapBucketReader,
29)
30from allmydata.storage.crawler import BucketCountingCrawler
31from allmydata.storage.expirer import LeaseCheckingCrawler
32
33# storage/
34# storage/shares/incoming
35#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
36#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
37# storage/shares/$START/$STORAGEINDEX
38# storage/shares/$START/$STORAGEINDEX/$SHARENUM
39
40# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
41# base-32 chars).
42
43# $SHARENUM matches this regex:
44NUM_RE=re.compile("^[0-9]+$")
45
46
47# Number of seconds to add to expiration time on lease renewal.
48# For now it's not actually configurable, but maybe someday.
49DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
50
51
52@implementer(IStatsProducer)
53class StorageServer(service.MultiService):
54    """
55    Implement the business logic for the storage server.
56    """
57    # The type in Twisted for services is wrong in 22.10...
58    # https://github.com/twisted/twisted/issues/10135
59    name = 'storage'  # type: ignore[assignment]
60    # only the tests change this to anything else
61    LeaseCheckerClass = LeaseCheckingCrawler
62
63    def __init__(self, storedir, nodeid, reserved_space=0,
64                 discard_storage=False, readonly_storage=False,
65                 stats_provider=None,
66                 expiration_enabled=False,
67                 expiration_mode="age",
68                 expiration_override_lease_duration=None,
69                 expiration_cutoff_date=None,
70                 expiration_sharetypes=("mutable", "immutable"),
71                 clock=reactor):
72        service.MultiService.__init__(self)
73        assert isinstance(nodeid, bytes)
74        assert len(nodeid) == 20
75        assert isinstance(nodeid, bytes)
76        self.my_nodeid = nodeid
77        self.storedir = storedir
78        sharedir = os.path.join(storedir, "shares")
79        fileutil.make_dirs(sharedir)
80        self.sharedir = sharedir
81        self.corruption_advisory_dir = os.path.join(storedir,
82                                                    "corruption-advisories")
83        fileutil.make_dirs(self.corruption_advisory_dir)
84        self.reserved_space = int(reserved_space)
85        self.no_storage = discard_storage
86        self.readonly_storage = readonly_storage
87        self.stats_provider = stats_provider
88        if self.stats_provider:
89            self.stats_provider.register_producer(self)
90        self.incomingdir = os.path.join(sharedir, 'incoming')
91        self._clean_incomplete()
92        fileutil.make_dirs(self.incomingdir)
93        log.msg("StorageServer created", facility="tahoe.storage")
94
95        if reserved_space:
96            if self.get_available_space() is None:
97                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
98                        umin="0wZ27w", level=log.UNUSUAL)
99
100        self.latencies = {"allocate": [], # immutable
101                          "write": [],
102                          "close": [],
103                          "read": [],
104                          "get": [],
105                          "writev": [], # mutable
106                          "readv": [],
107                          "add-lease": [], # both
108                          "renew": [],
109                          "cancel": [],
110                          }
111        self.add_bucket_counter()
112
113        statefile = os.path.join(self.storedir, "lease_checker.state")
114        historyfile = os.path.join(self.storedir, "lease_checker.history")
115        klass = self.LeaseCheckerClass
116        self.lease_checker = klass(self, statefile, historyfile,
117                                   expiration_enabled, expiration_mode,
118                                   expiration_override_lease_duration,
119                                   expiration_cutoff_date,
120                                   expiration_sharetypes)
121        self.lease_checker.setServiceParent(self)
122        self._clock = clock
123
124        # Map in-progress filesystem path -> BucketWriter:
125        self._bucket_writers = {}  # type: Dict[str,BucketWriter]
126
127        # These callables will be called with BucketWriters that closed:
128        self._call_on_bucket_writer_close = []
129
130    def stopService(self):
131        # Cancel any in-progress uploads:
132        for bw in list(self._bucket_writers.values()):
133            bw.disconnected()
134        return service.MultiService.stopService(self)
135
136    def __repr__(self):
137        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
138
139    def have_shares(self):
140        # quick test to decide if we need to commit to an implicit
141        # permutation-seed or if we should use a new one
142        return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
143
144    def add_bucket_counter(self):
145        statefile = os.path.join(self.storedir, "bucket_counter.state")
146        self.bucket_counter = BucketCountingCrawler(self, statefile)
147        self.bucket_counter.setServiceParent(self)
148
149    def count(self, name, delta=1):
150        if self.stats_provider:
151            self.stats_provider.count("storage_server." + name, delta)
152
153    def add_latency(self, category, latency):
154        a = self.latencies[category]
155        a.append(latency)
156        if len(a) > 1000:
157            self.latencies[category] = a[-1000:]
158
159    def get_latencies(self):
160        """Return a dict, indexed by category, that contains a dict of
161        latency numbers for each category. If there are sufficient samples
162        for unambiguous interpretation, each dict will contain the
163        following keys: mean, 01_0_percentile, 10_0_percentile,
164        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
165        99_0_percentile, 99_9_percentile.  If there are insufficient
166        samples for a given percentile to be interpreted unambiguously
167        that percentile will be reported as None. If no samples have been
168        collected for the given category, then that category name will
169        not be present in the return value. """
170        # note that Amazon's Dynamo paper says they use 99.9% percentile.
171        output = {}
172        for category in self.latencies:
173            if not self.latencies[category]:
174                continue
175            stats = {}
176            samples = self.latencies[category][:]
177            count = len(samples)
178            stats["samplesize"] = count
179            samples.sort()
180            if count > 1:
181                stats["mean"] = sum(samples) / count
182            else:
183                stats["mean"] = None
184
185            orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
186                             (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
187                             (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
188                             (0.999, "99_9_percentile", 1000)]
189
190            for percentile, percentilestring, minnumtoobserve in orderstatlist:
191                if count >= minnumtoobserve:
192                    stats[percentilestring] = samples[int(percentile*count)]
193                else:
194                    stats[percentilestring] = None
195
196            output[category] = stats
197        return output
198
199    def log(self, *args, **kwargs):
200        if "facility" not in kwargs:
201            kwargs["facility"] = "tahoe.storage"
202        return log.msg(*args, **kwargs)
203
204    def _clean_incomplete(self):
205        fileutil.rm_dir(self.incomingdir)
206
207    def get_stats(self):
208        # remember: RIStatsProvider requires that our return dict
209        # contains numeric values.
210        stats = { 'storage_server.allocated': self.allocated_size(), }
211        stats['storage_server.reserved_space'] = self.reserved_space
212        for category,ld in self.get_latencies().items():
213            for name,v in ld.items():
214                stats['storage_server.latencies.%s.%s' % (category, name)] = v
215
216        try:
217            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
218            writeable = disk['avail'] > 0
219
220            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
221            stats['storage_server.disk_total'] = disk['total']
222            stats['storage_server.disk_used'] = disk['used']
223            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
224            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
225            stats['storage_server.disk_avail'] = disk['avail']
226        except AttributeError:
227            writeable = True
228        except EnvironmentError:
229            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
230            writeable = False
231
232        if self.readonly_storage:
233            stats['storage_server.disk_avail'] = 0
234            writeable = False
235
236        stats['storage_server.accepting_immutable_shares'] = int(writeable)
237        s = self.bucket_counter.get_state()
238        bucket_count = s.get("last-complete-bucket-count")
239        if bucket_count:
240            stats['storage_server.total_bucket_count'] = bucket_count
241        return stats
242
243    def get_available_space(self):
244        """Returns available space for share storage in bytes, or None if no
245        API to get this information is available."""
246
247        if self.readonly_storage:
248            return 0
249        return fileutil.get_available_space(self.sharedir, self.reserved_space)
250
251    def allocated_size(self):
252        space = 0
253        for bw in self._bucket_writers.values():
254            space += bw.allocated_size()
255        return space
256
257    def get_version(self):
258        remaining_space = self.get_available_space()
259        if remaining_space is None:
260            # We're on a platform that has no API to get disk stats.
261            remaining_space = 2**64
262
263        # Unicode strings might be nicer, but for now sticking to bytes since
264        # this is what the wire protocol has always been.
265        version = { b"http://allmydata.org/tahoe/protocols/storage/v1" :
266                    { b"maximum-immutable-share-size": remaining_space,
267                      b"maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
268                      b"available-space": remaining_space,
269                      b"tolerates-immutable-read-overrun": True,
270                      b"delete-mutable-shares-with-zero-length-writev": True,
271                      b"fills-holes-with-zero-bytes": True,
272                      b"prevents-read-past-end-of-share-data": True,
273                      },
274                    b"application-version": allmydata.__full_version__.encode("utf-8"),
275                    }
276        return version
277
278    def allocate_buckets(self, storage_index,
279                          renew_secret, cancel_secret,
280                          sharenums, allocated_size,
281                          owner_num=0, renew_leases=True):
282        """
283        Generic bucket allocation API.
284
285        :param bool renew_leases: If and only if this is ``True`` then renew a
286            secret-matching lease on (or, if none match, add a new lease to)
287            existing shares in this bucket.  Any *new* shares are given a new
288            lease regardless.
289        """
290        # owner_num is not for clients to set, but rather it should be
291        # curried into the PersonalStorageServer instance that is dedicated
292        # to a particular owner.
293        start = self._clock.seconds()
294        self.count("allocate")
295        alreadygot = {}
296        bucketwriters = {} # k: shnum, v: BucketWriter
297        si_dir = storage_index_to_dir(storage_index)
298        si_s = si_b2a(storage_index)
299
300        log.msg("storage: allocate_buckets %r" % si_s)
301
302        # in this implementation, the lease information (including secrets)
303        # goes into the share files themselves. It could also be put into a
304        # separate database. Note that the lease should not be added until
305        # the BucketWriter has been closed.
306        expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
307        lease_info = LeaseInfo(owner_num,
308                               renew_secret, cancel_secret,
309                               expire_time, self.my_nodeid)
310
311        max_space_per_bucket = allocated_size
312
313        remaining_space = self.get_available_space()
314        limited = remaining_space is not None
315        if limited:
316            # this is a bit conservative, since some of this allocated_size()
317            # has already been written to disk, where it will show up in
318            # get_available_space.
319            remaining_space -= self.allocated_size()
320        # self.readonly_storage causes remaining_space <= 0
321
322        # fill alreadygot with all shares that we have, not just the ones
323        # they asked about: this will save them a lot of work. Add or update
324        # leases for all of them: if they want us to hold shares for this
325        # file, they'll want us to hold leases for this file.
326        for (shnum, fn) in self.get_shares(storage_index):
327            alreadygot[shnum] = ShareFile(fn)
328        if renew_leases:
329            self._add_or_renew_leases(alreadygot.values(), lease_info)
330
331        for shnum in sharenums:
332            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
333            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
334            if os.path.exists(finalhome):
335                # great! we already have it. easy.
336                pass
337            elif os.path.exists(incominghome):
338                # For Foolscap we don't create BucketWriters for shnums that
339                # have a partial share (in incoming/), so if a second upload
340                # occurs while the first is still in progress, the second
341                # uploader will use different storage servers.
342                pass
343            elif (not limited) or (remaining_space >= max_space_per_bucket):
344                # ok! we need to create the new share file.
345                bw = BucketWriter(self, incominghome, finalhome,
346                                  max_space_per_bucket, lease_info,
347                                  clock=self._clock)
348                if self.no_storage:
349                    # Really this should be done by having a separate class for
350                    # this situation; see
351                    # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3862
352                    bw.throw_out_all_data = True
353                bucketwriters[shnum] = bw
354                self._bucket_writers[incominghome] = bw
355                if limited:
356                    remaining_space -= max_space_per_bucket
357            else:
358                # bummer! not enough space to accept this bucket
359                pass
360
361        if bucketwriters:
362            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
363
364        self.add_latency("allocate", self._clock.seconds() - start)
365        return set(alreadygot), bucketwriters
366
367    def _iter_share_files(self, storage_index):
368        for shnum, filename in self.get_shares(storage_index):
369            with open(filename, 'rb') as f:
370                header = f.read(32)
371            if MutableShareFile.is_valid_header(header):
372                sf = MutableShareFile(filename, self)
373                # note: if the share has been migrated, the renew_lease()
374                # call will throw an exception, with information to help the
375                # client update the lease.
376            elif ShareFile.is_valid_header(header):
377                sf = ShareFile(filename)
378            else:
379                continue # non-sharefile
380            yield sf
381
382    def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1):
383        start = self._clock.seconds()
384        self.count("add-lease")
385        new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
386        lease_info = LeaseInfo(owner_num,
387                               renew_secret, cancel_secret,
388                               new_expire_time, self.my_nodeid)
389        self._add_or_renew_leases(
390            self._iter_share_files(storage_index),
391            lease_info,
392        )
393        self.add_latency("add-lease", self._clock.seconds() - start)
394        return None
395
396    def renew_lease(self, storage_index, renew_secret):
397        start = self._clock.seconds()
398        self.count("renew")
399        new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
400        found_buckets = False
401        for sf in self._iter_share_files(storage_index):
402            found_buckets = True
403            sf.renew_lease(renew_secret, new_expire_time)
404        self.add_latency("renew", self._clock.seconds() - start)
405        if not found_buckets:
406            raise IndexError("no such lease to renew")
407
408    def bucket_writer_closed(self, bw, consumed_size):
409        if self.stats_provider:
410            self.stats_provider.count('storage_server.bytes_added', consumed_size)
411        del self._bucket_writers[bw.incominghome]
412        for handler in self._call_on_bucket_writer_close:
413            handler(bw)
414
415    def register_bucket_writer_close_handler(self, handler):
416        """
417        The handler will be called with any ``BucketWriter`` that closes.
418        """
419        self._call_on_bucket_writer_close.append(handler)
420
421    def get_shares(self, storage_index) -> Iterable[tuple[int, str]]:
422        """
423        Return an iterable of (shnum, pathname) tuples for files that hold
424        shares for this storage_index. In each tuple, 'shnum' will always be
425        the integer form of the last component of 'pathname'.
426        """
427        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
428        try:
429            for f in os.listdir(storagedir):
430                if NUM_RE.match(f):
431                    filename = os.path.join(storagedir, f)
432                    yield (int(f), filename)
433        except OSError:
434            # Commonly caused by there being no buckets at all.
435            pass
436
437    def get_buckets(self, storage_index):
438        """
439        Get ``BucketReaders`` for an immutable.
440        """
441        start = self._clock.seconds()
442        self.count("get")
443        si_s = si_b2a(storage_index)
444        log.msg("storage: get_buckets %r" % si_s)
445        bucketreaders = {} # k: sharenum, v: BucketReader
446        for shnum, filename in self.get_shares(storage_index):
447            bucketreaders[shnum] = BucketReader(self, filename,
448                                                storage_index, shnum)
449        self.add_latency("get", self._clock.seconds() - start)
450        return bucketreaders
451
452    def get_leases(self, storage_index):
453        """Provide an iterator that yields all of the leases attached to this
454        bucket. Each lease is returned as a LeaseInfo instance.
455
456        This method is not for client use.
457
458        :note: Only for immutable shares.
459        """
460        # since all shares get the same lease data, we just grab the leases
461        # from the first share
462        try:
463            shnum, filename = next(self.get_shares(storage_index))
464            sf = ShareFile(filename)
465            return sf.get_leases()
466        except StopIteration:
467            return iter([])
468
469    def get_slot_leases(self, storage_index):
470        """
471        This method is not for client use.
472
473        :note: Only for mutable shares.
474
475        :return: An iterable of the leases attached to this slot.
476        """
477        for _, share_filename in self.get_shares(storage_index):
478            share = MutableShareFile(share_filename)
479            return share.get_leases()
480        return []
481
482    def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
483        """
484        Gather up existing mutable shares for the given storage index.
485
486        :param bytes bucketdir: The filesystem path containing shares for the
487            given storage index.
488
489        :param bytes write_enabler: The write enabler secret for the shares.
490
491        :param bytes si_s: The storage index in encoded (base32) form.
492
493        :raise BadWriteEnablerError: If the write enabler is not correct for
494            any of the collected shares.
495
496        :return dict[int, MutableShareFile]: The collected shares in a mapping
497            from integer share numbers to ``MutableShareFile`` instances.
498        """
499        shares = {}
500        if os.path.isdir(bucketdir):
501            # shares exist if there is a file for them
502            for sharenum_s in os.listdir(bucketdir):
503                try:
504                    sharenum = int(sharenum_s)
505                except ValueError:
506                    continue
507                filename = os.path.join(bucketdir, sharenum_s)
508                msf = MutableShareFile(filename, self)
509                msf.check_write_enabler(write_enabler, si_s)
510                shares[sharenum] = msf
511        return shares
512
513    def _evaluate_test_vectors(self, test_and_write_vectors, shares):
514        """
515        Execute test vectors against share data.
516
517        :param test_and_write_vectors: See
518            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
519
520        :param dict[int, MutableShareFile] shares: The shares against which to
521            execute the vectors.
522
523        :return bool: ``True`` if and only if all of the test vectors succeed
524            against the given shares.
525        """
526        for sharenum in test_and_write_vectors:
527            (testv, datav, new_length) = test_and_write_vectors[sharenum]
528            if sharenum in shares:
529                if not shares[sharenum].check_testv(testv):
530                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
531                    return False
532            else:
533                # compare the vectors against an empty share, in which all
534                # reads return empty strings.
535                if not EmptyShare().check_testv(testv):
536                    self.log("testv failed (empty): [%d] %r" % (sharenum,
537                                                                testv))
538                    return False
539        return True
540
541    def _evaluate_read_vectors(self, read_vector, shares):
542        """
543        Execute read vectors against share data.
544
545        :param read_vector: See ``allmydata.interfaces.ReadVector``.
546
547        :param dict[int, MutableShareFile] shares: The shares against which to
548            execute the vector.
549
550        :return dict[int, bytes]: The data read from the shares.
551        """
552        read_data = {}
553        for sharenum, share in shares.items():
554            read_data[sharenum] = share.readv(read_vector)
555        return read_data
556
557    def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
558        """
559        Execute write vectors against share data.
560
561        :param bytes bucketdir: The parent directory holding the shares.  This
562            is removed if the last share is removed from it.  If shares are
563            created, they are created in it.
564
565        :param secrets: A tuple of ``WriteEnablerSecret``,
566            ``LeaseRenewSecret``, and ``LeaseCancelSecret``.  These secrets
567            are used to initialize new shares.
568
569        :param test_and_write_vectors: See
570            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
571
572        :param dict[int, MutableShareFile]: The shares against which to
573            execute the vectors.
574
575        :return dict[int, MutableShareFile]: The shares which still exist
576            after applying the vectors.
577        """
578        remaining_shares = {}
579
580        for sharenum in test_and_write_vectors:
581            (testv, datav, new_length) = test_and_write_vectors[sharenum]
582            if new_length == 0:
583                if sharenum in shares:
584                    shares[sharenum].unlink()
585            else:
586                if sharenum not in shares:
587                    # allocate a new share
588                    share = self._allocate_slot_share(bucketdir, secrets,
589                                                      sharenum,
590                                                      owner_num=0)
591                    shares[sharenum] = share
592                shares[sharenum].writev(datav, new_length)
593                remaining_shares[sharenum] = shares[sharenum]
594
595            if new_length == 0:
596                # delete bucket directories that exist but are empty.  They
597                # might not exist if a client showed up and asked us to
598                # truncate a share we weren't even holding.
599                if os.path.exists(bucketdir) and [] == os.listdir(bucketdir):
600                    os.rmdir(bucketdir)
601        return remaining_shares
602
603    def _make_lease_info(self, renew_secret, cancel_secret):
604        """
605        :return LeaseInfo: Information for a new lease for a share.
606        """
607        ownerid = 1 # TODO
608        expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
609        lease_info = LeaseInfo(ownerid,
610                               renew_secret, cancel_secret,
611                               expire_time, self.my_nodeid)
612        return lease_info
613
614    def _add_or_renew_leases(self, shares, lease_info):
615        """
616        Put the given lease onto the given shares.
617
618        :param Iterable[Union[MutableShareFile, ShareFile]] shares: The shares
619            to put the lease onto.
620
621        :param LeaseInfo lease_info: The lease to put on the shares.
622        """
623        for share in shares:
624            share.add_or_renew_lease(self.get_available_space(), lease_info)
625
626    def slot_testv_and_readv_and_writev(  # type: ignore # warner/foolscap#78
627            self,
628            storage_index,
629            secrets,
630            test_and_write_vectors,
631            read_vector,
632            renew_leases=True,
633    ):
634        """
635        Read data from shares and conditionally write some data to them.
636
637        :param bool renew_leases: If and only if this is ``True`` and the test
638            vectors pass then shares mentioned in ``test_and_write_vectors``
639            that still exist after the changes are made will also have a
640            secret-matching lease renewed (or, if none match, a new lease
641            added).
642
643        See ``allmydata.interfaces.RIStorageServer`` for details about other
644        parameters and return value.
645        """
646        start = self._clock.seconds()
647        self.count("writev")
648        si_s = si_b2a(storage_index)
649        log.msg("storage: slot_writev %r" % si_s)
650        si_dir = storage_index_to_dir(storage_index)
651        (write_enabler, renew_secret, cancel_secret) = secrets
652        bucketdir = os.path.join(self.sharedir, si_dir)
653
654        # If collection succeeds we know the write_enabler is good for all
655        # existing shares.
656        shares = self._collect_mutable_shares_for_storage_index(
657            bucketdir,
658            write_enabler,
659            si_s,
660        )
661
662        # Now evaluate test vectors.
663        testv_is_good = self._evaluate_test_vectors(
664            test_and_write_vectors,
665            shares,
666        )
667
668        # now gather the read vectors, before we do any writes
669        read_data = self._evaluate_read_vectors(
670            read_vector,
671            shares,
672        )
673
674        if testv_is_good:
675            # now apply the write vectors
676            remaining_shares = self._evaluate_write_vectors(
677                bucketdir,
678                secrets,
679                test_and_write_vectors,
680                shares,
681            )
682            if renew_leases:
683                lease_info = self._make_lease_info(renew_secret, cancel_secret)
684                self._add_or_renew_leases(remaining_shares.values(), lease_info)
685
686        # all done
687        self.add_latency("writev", self._clock.seconds() - start)
688        return (testv_is_good, read_data)
689
690    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
691                             owner_num=0):
692        (write_enabler, renew_secret, cancel_secret) = secrets
693        my_nodeid = self.my_nodeid
694        fileutil.make_dirs(bucketdir)
695        filename = os.path.join(bucketdir, "%d" % sharenum)
696        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
697                                         self)
698        return share
699
700    def enumerate_mutable_shares(self, storage_index: bytes) -> set[int]:
701        """Return all share numbers for the given mutable."""
702        si_dir = storage_index_to_dir(storage_index)
703        # shares exist if there is a file for them
704        bucketdir = os.path.join(self.sharedir, si_dir)
705        if not os.path.isdir(bucketdir):
706            return set()
707        result = set()
708        for sharenum_s in os.listdir(bucketdir):
709            try:
710                result.add(int(sharenum_s))
711            except ValueError:
712                continue
713        return result
714
715    def slot_readv(self, storage_index, shares, readv):
716        start = self._clock.seconds()
717        self.count("readv")
718        si_s = si_b2a(storage_index)
719        lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
720                     facility="tahoe.storage", level=log.OPERATIONAL)
721        si_dir = storage_index_to_dir(storage_index)
722        # shares exist if there is a file for them
723        bucketdir = os.path.join(self.sharedir, si_dir)
724        if not os.path.isdir(bucketdir):
725            self.add_latency("readv", self._clock.seconds() - start)
726            return {}
727        datavs = {}
728        for sharenum_s in os.listdir(bucketdir):
729            try:
730                sharenum = int(sharenum_s)
731            except ValueError:
732                continue
733            if sharenum in shares or not shares:
734                filename = os.path.join(bucketdir, sharenum_s)
735                msf = MutableShareFile(filename, self)
736                datavs[sharenum] = msf.readv(readv)
737        log.msg("returning shares %s" % (list(datavs.keys()),),
738                facility="tahoe.storage", level=log.NOISY, parent=lp)
739        self.add_latency("readv", self._clock.seconds() - start)
740        return datavs
741
742    def _share_exists(self, storage_index, shnum):
743        """
744        Check local share storage to see if a matching share exists.
745
746        :param bytes storage_index: The storage index to inspect.
747        :param int shnum: The share number to check for.
748
749        :return bool: ``True`` if a share with the given number exists at the
750            given storage index, ``False`` otherwise.
751        """
752        for existing_sharenum, ignored in self.get_shares(storage_index):
753            if existing_sharenum == shnum:
754                return True
755        return False
756
757    def advise_corrupt_share(self, share_type, storage_index, shnum,
758                             reason):
759        # Previously this had to be bytes for legacy protocol backwards
760        # compatibility reasons. Now that Foolscap layer has been abstracted
761        # out, we can probably refactor this to be unicode...
762        assert isinstance(share_type, bytes)
763        assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
764
765        si_s = si_b2a(storage_index)
766
767        if not self._share_exists(storage_index, shnum):
768            log.msg(
769                format=(
770                    "discarding client corruption claim for %(si)s/%(shnum)d "
771                    "which I do not have"
772                ),
773                si=si_s,
774                shnum=shnum,
775            )
776            return
777
778        log.msg(format=("client claims corruption in (%(share_type)s) " +
779                        "%(si)s-%(shnum)d: %(reason)s"),
780                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
781                level=log.SCARY, umid="SGx2fA")
782
783        report = render_corruption_report(share_type, si_s, shnum, reason)
784        if len(report) > self.get_available_space():
785            return None
786
787        now = time_format.iso_utc(sep="T")
788        report_path = get_corruption_report_path(
789            self.corruption_advisory_dir,
790            now,
791            si_s.decode("utf8"),
792            shnum,
793        )
794        with open(report_path, "w", encoding="utf-8") as f:
795            f.write(report)
796
797        return None
798
799    def get_immutable_share_length(self, storage_index: bytes, share_number: int) -> int:
800        """Returns the length (in bytes) of an immutable."""
801        si_dir = storage_index_to_dir(storage_index)
802        path = os.path.join(self.sharedir, si_dir, str(share_number))
803        return ShareFile(path).get_length()
804
805    def get_mutable_share_length(self, storage_index: bytes, share_number: int) -> int:
806        """Returns the length (in bytes) of a mutable."""
807        si_dir = storage_index_to_dir(storage_index)
808        path = os.path.join(self.sharedir, si_dir, str(share_number))
809        if not os.path.exists(path):
810            raise KeyError("No such storage index or share number")
811        return MutableShareFile(path).get_length()
812
813
814@implementer(RIStorageServer)
815class FoolscapStorageServer(Referenceable):  # type: ignore # warner/foolscap#78
816    """
817    A filesystem-based implementation of ``RIStorageServer``.
818
819    For Foolscap, BucketWriter lifetime is tied to connection: when
820    disconnection happens, the BucketWriters are removed.
821    """
822    name = 'storage'
823
824    def __init__(self, storage_server):  # type: (StorageServer) -> None
825        self._server = storage_server
826
827        # Canaries and disconnect markers for BucketWriters created via Foolscap:
828        self._bucket_writer_disconnect_markers : dict[BucketWriter, tuple[IRemoteReference, Any]] = {}
829
830        self._server.register_bucket_writer_close_handler(self._bucket_writer_closed)
831
832    def _bucket_writer_closed(self, bw):
833        if bw in self._bucket_writer_disconnect_markers:
834            canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
835            canary.dontNotifyOnDisconnect(disconnect_marker)
836
837    def remote_get_version(self):
838        return self._server.get_version()
839
840    def remote_allocate_buckets(self, storage_index,
841                                renew_secret, cancel_secret,
842                                sharenums, allocated_size,
843                                canary, owner_num=0):
844        """Foolscap-specific ``allocate_buckets()`` API."""
845        alreadygot, bucketwriters = self._server.allocate_buckets(
846            storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
847            owner_num=owner_num, renew_leases=True,
848        )
849
850        # Abort BucketWriters if disconnection happens.
851        for bw in bucketwriters.values():
852            disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
853            self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
854
855        # Wrap BucketWriters with Foolscap adapter:
856        bucketwriters = {
857            k: FoolscapBucketWriter(bw)
858            for (k, bw) in bucketwriters.items()
859        }
860
861        return alreadygot, bucketwriters
862
863    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
864                         owner_num=1):
865        return self._server.add_lease(storage_index, renew_secret, cancel_secret)
866
867    def remote_renew_lease(self, storage_index, renew_secret):
868        return self._server.renew_lease(storage_index, renew_secret)
869
870    def remote_get_buckets(self, storage_index):
871        return {
872            k: FoolscapBucketReader(bucket)
873            for (k, bucket) in self._server.get_buckets(storage_index).items()
874        }
875
876    def remote_slot_testv_and_readv_and_writev(self, storage_index,
877                                               secrets,
878                                               test_and_write_vectors,
879                                               read_vector):
880        return self._server.slot_testv_and_readv_and_writev(
881            storage_index,
882            secrets,
883            test_and_write_vectors,
884            read_vector,
885            renew_leases=True,
886        )
887
888    def remote_slot_readv(self, storage_index, shares, readv):
889        return self._server.slot_readv(storage_index, shares, readv)
890
891    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
892                                    reason):
893        return self._server.advise_corrupt_share(share_type, storage_index, shnum,
894                                                 reason)
895
896
897CORRUPTION_REPORT_FORMAT = """\
898report: Share Corruption
899type: {type}
900storage_index: {storage_index}
901share_number: {share_number}
902
903{reason}
904
905"""
906
907def render_corruption_report(
908    share_type: bytes,
909    si_s: bytes,
910    shnum: int,
911    reason: bytes
912) -> str:
913    """
914    Create a string that explains a corruption report using freeform text.
915
916    :param bytes share_type: The type of the share which the report is about.
917
918    :param bytes si_s: The encoded representation of the storage index which
919        the report is about.
920
921    :param int shnum: The share number which the report is about.
922
923    :param bytes reason: The reason given by the client for the corruption
924        report.
925    """
926    return CORRUPTION_REPORT_FORMAT.format(
927        type=share_type.decode(),
928        storage_index=si_s.decode(),
929        share_number=shnum,
930        reason=reason.decode(),
931    )
932
933def get_corruption_report_path(
934    base_dir: str,
935    now: str,
936    si_s: str,
937    shnum: int
938) -> str:
939    """
940    Determine the path to which a certain corruption report should be written.
941
942    :param str base_dir: The directory beneath which to construct the path.
943
944    :param str now: The time of the report.
945
946    :param str si_s: The encoded representation of the storage index which the
947        report is about.
948
949    :param int shnum: The share number which the report is about.
950
951    :return str: A path to which the report can be written.
952    """
953    # windows can't handle colons in the filename
954    return os.path.join(
955        base_dir,
956        ("%s--%s-%d" % (now, si_s, shnum)).replace(":","")
957    )
Note: See TracBrowser for help on using the repository browser.