source: trunk/src/allmydata/storage/server.py

Last change on this file was cab24e4, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-06-14T18:12:39Z

Another service name issue.

  • Property mode set to 100644
File size: 37.8 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import annotations
5
6from future.utils import bytes_to_native_str
7from typing import Iterable, Any
8
9import os, re
10
11from foolscap.api import Referenceable
12from foolscap.ipb import IRemoteReference
13from twisted.application import service
14from twisted.internet import reactor
15
16from zope.interface import implementer
17from allmydata.interfaces import RIStorageServer, IStatsProducer
18from allmydata.util import fileutil, idlib, log, time_format
19import allmydata # for __full_version__
20
21from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
22_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
23from allmydata.storage.lease import LeaseInfo
24from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
25     create_mutable_sharefile
26from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
27from allmydata.storage.immutable import (
28    ShareFile, BucketWriter, BucketReader, FoolscapBucketWriter,
29    FoolscapBucketReader,
30)
31from allmydata.storage.crawler import BucketCountingCrawler
32from allmydata.storage.expirer import LeaseCheckingCrawler
33
34# storage/
35# storage/shares/incoming
36#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
37#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
38# storage/shares/$START/$STORAGEINDEX
39# storage/shares/$START/$STORAGEINDEX/$SHARENUM
40
41# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
42# base-32 chars).
43
44# $SHARENUM matches this regex:
45NUM_RE=re.compile("^[0-9]+$")
46
47
48# Number of seconds to add to expiration time on lease renewal.
49# For now it's not actually configurable, but maybe someday.
50DEFAULT_RENEWAL_TIME = 31 * 24 * 60 * 60
51
52
53@implementer(IStatsProducer)
54class StorageServer(service.MultiService):
55    """
56    Implement the business logic for the storage server.
57    """
58    # The type in Twisted for services is wrong in 22.10...
59    # https://github.com/twisted/twisted/issues/10135
60    name = 'storage'  # type: ignore[assignment]
61    # only the tests change this to anything else
62    LeaseCheckerClass = LeaseCheckingCrawler
63
64    def __init__(self, storedir, nodeid, reserved_space=0,
65                 discard_storage=False, readonly_storage=False,
66                 stats_provider=None,
67                 expiration_enabled=False,
68                 expiration_mode="age",
69                 expiration_override_lease_duration=None,
70                 expiration_cutoff_date=None,
71                 expiration_sharetypes=("mutable", "immutable"),
72                 clock=reactor):
73        service.MultiService.__init__(self)
74        assert isinstance(nodeid, bytes)
75        assert len(nodeid) == 20
76        assert isinstance(nodeid, bytes)
77        self.my_nodeid = nodeid
78        self.storedir = storedir
79        sharedir = os.path.join(storedir, "shares")
80        fileutil.make_dirs(sharedir)
81        self.sharedir = sharedir
82        self.corruption_advisory_dir = os.path.join(storedir,
83                                                    "corruption-advisories")
84        fileutil.make_dirs(self.corruption_advisory_dir)
85        self.reserved_space = int(reserved_space)
86        self.no_storage = discard_storage
87        self.readonly_storage = readonly_storage
88        self.stats_provider = stats_provider
89        if self.stats_provider:
90            self.stats_provider.register_producer(self)
91        self.incomingdir = os.path.join(sharedir, 'incoming')
92        self._clean_incomplete()
93        fileutil.make_dirs(self.incomingdir)
94        log.msg("StorageServer created", facility="tahoe.storage")
95
96        if reserved_space:
97            if self.get_available_space() is None:
98                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
99                        umin="0wZ27w", level=log.UNUSUAL)
100
101        self.latencies = {"allocate": [], # immutable
102                          "write": [],
103                          "close": [],
104                          "read": [],
105                          "get": [],
106                          "writev": [], # mutable
107                          "readv": [],
108                          "add-lease": [], # both
109                          "renew": [],
110                          "cancel": [],
111                          }
112        self.add_bucket_counter()
113
114        statefile = os.path.join(self.storedir, "lease_checker.state")
115        historyfile = os.path.join(self.storedir, "lease_checker.history")
116        klass = self.LeaseCheckerClass
117        self.lease_checker = klass(self, statefile, historyfile,
118                                   expiration_enabled, expiration_mode,
119                                   expiration_override_lease_duration,
120                                   expiration_cutoff_date,
121                                   expiration_sharetypes)
122        self.lease_checker.setServiceParent(self)
123        self._clock = clock
124
125        # Map in-progress filesystem path -> BucketWriter:
126        self._bucket_writers = {}  # type: Dict[str,BucketWriter]
127
128        # These callables will be called with BucketWriters that closed:
129        self._call_on_bucket_writer_close = []
130
131    def stopService(self):
132        # Cancel any in-progress uploads:
133        for bw in list(self._bucket_writers.values()):
134            bw.disconnected()
135        return service.MultiService.stopService(self)
136
137    def __repr__(self):
138        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
139
140    def have_shares(self):
141        # quick test to decide if we need to commit to an implicit
142        # permutation-seed or if we should use a new one
143        return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
144
145    def add_bucket_counter(self):
146        statefile = os.path.join(self.storedir, "bucket_counter.state")
147        self.bucket_counter = BucketCountingCrawler(self, statefile)
148        self.bucket_counter.setServiceParent(self)
149
150    def count(self, name, delta=1):
151        if self.stats_provider:
152            self.stats_provider.count("storage_server." + name, delta)
153
154    def add_latency(self, category, latency):
155        a = self.latencies[category]
156        a.append(latency)
157        if len(a) > 1000:
158            self.latencies[category] = a[-1000:]
159
160    def get_latencies(self):
161        """Return a dict, indexed by category, that contains a dict of
162        latency numbers for each category. If there are sufficient samples
163        for unambiguous interpretation, each dict will contain the
164        following keys: mean, 01_0_percentile, 10_0_percentile,
165        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
166        99_0_percentile, 99_9_percentile.  If there are insufficient
167        samples for a given percentile to be interpreted unambiguously
168        that percentile will be reported as None. If no samples have been
169        collected for the given category, then that category name will
170        not be present in the return value. """
171        # note that Amazon's Dynamo paper says they use 99.9% percentile.
172        output = {}
173        for category in self.latencies:
174            if not self.latencies[category]:
175                continue
176            stats = {}
177            samples = self.latencies[category][:]
178            count = len(samples)
179            stats["samplesize"] = count
180            samples.sort()
181            if count > 1:
182                stats["mean"] = sum(samples) / count
183            else:
184                stats["mean"] = None
185
186            orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
187                             (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
188                             (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
189                             (0.999, "99_9_percentile", 1000)]
190
191            for percentile, percentilestring, minnumtoobserve in orderstatlist:
192                if count >= minnumtoobserve:
193                    stats[percentilestring] = samples[int(percentile*count)]
194                else:
195                    stats[percentilestring] = None
196
197            output[category] = stats
198        return output
199
200    def log(self, *args, **kwargs):
201        if "facility" not in kwargs:
202            kwargs["facility"] = "tahoe.storage"
203        return log.msg(*args, **kwargs)
204
205    def _clean_incomplete(self):
206        fileutil.rm_dir(self.incomingdir)
207
208    def get_stats(self):
209        # remember: RIStatsProvider requires that our return dict
210        # contains numeric values.
211        stats = { 'storage_server.allocated': self.allocated_size(), }
212        stats['storage_server.reserved_space'] = self.reserved_space
213        for category,ld in self.get_latencies().items():
214            for name,v in ld.items():
215                stats['storage_server.latencies.%s.%s' % (category, name)] = v
216
217        try:
218            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
219            writeable = disk['avail'] > 0
220
221            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
222            stats['storage_server.disk_total'] = disk['total']
223            stats['storage_server.disk_used'] = disk['used']
224            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
225            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
226            stats['storage_server.disk_avail'] = disk['avail']
227        except AttributeError:
228            writeable = True
229        except EnvironmentError:
230            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
231            writeable = False
232
233        if self.readonly_storage:
234            stats['storage_server.disk_avail'] = 0
235            writeable = False
236
237        stats['storage_server.accepting_immutable_shares'] = int(writeable)
238        s = self.bucket_counter.get_state()
239        bucket_count = s.get("last-complete-bucket-count")
240        if bucket_count:
241            stats['storage_server.total_bucket_count'] = bucket_count
242        return stats
243
244    def get_available_space(self):
245        """Returns available space for share storage in bytes, or None if no
246        API to get this information is available."""
247
248        if self.readonly_storage:
249            return 0
250        return fileutil.get_available_space(self.sharedir, self.reserved_space)
251
252    def allocated_size(self):
253        space = 0
254        for bw in self._bucket_writers.values():
255            space += bw.allocated_size()
256        return space
257
258    def get_version(self):
259        remaining_space = self.get_available_space()
260        if remaining_space is None:
261            # We're on a platform that has no API to get disk stats.
262            remaining_space = 2**64
263
264        # Unicode strings might be nicer, but for now sticking to bytes since
265        # this is what the wire protocol has always been.
266        version = { b"http://allmydata.org/tahoe/protocols/storage/v1" :
267                    { b"maximum-immutable-share-size": remaining_space,
268                      b"maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
269                      b"available-space": remaining_space,
270                      b"tolerates-immutable-read-overrun": True,
271                      b"delete-mutable-shares-with-zero-length-writev": True,
272                      b"fills-holes-with-zero-bytes": True,
273                      b"prevents-read-past-end-of-share-data": True,
274                      },
275                    b"application-version": allmydata.__full_version__.encode("utf-8"),
276                    }
277        return version
278
279    def allocate_buckets(self, storage_index,
280                          renew_secret, cancel_secret,
281                          sharenums, allocated_size,
282                          owner_num=0, renew_leases=True):
283        """
284        Generic bucket allocation API.
285
286        :param bool renew_leases: If and only if this is ``True`` then renew a
287            secret-matching lease on (or, if none match, add a new lease to)
288            existing shares in this bucket.  Any *new* shares are given a new
289            lease regardless.
290        """
291        # owner_num is not for clients to set, but rather it should be
292        # curried into the PersonalStorageServer instance that is dedicated
293        # to a particular owner.
294        start = self._clock.seconds()
295        self.count("allocate")
296        alreadygot = {}
297        bucketwriters = {} # k: shnum, v: BucketWriter
298        si_dir = storage_index_to_dir(storage_index)
299        si_s = si_b2a(storage_index)
300
301        log.msg("storage: allocate_buckets %r" % si_s)
302
303        # in this implementation, the lease information (including secrets)
304        # goes into the share files themselves. It could also be put into a
305        # separate database. Note that the lease should not be added until
306        # the BucketWriter has been closed.
307        expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
308        lease_info = LeaseInfo(owner_num,
309                               renew_secret, cancel_secret,
310                               expire_time, self.my_nodeid)
311
312        max_space_per_bucket = allocated_size
313
314        remaining_space = self.get_available_space()
315        limited = remaining_space is not None
316        if limited:
317            # this is a bit conservative, since some of this allocated_size()
318            # has already been written to disk, where it will show up in
319            # get_available_space.
320            remaining_space -= self.allocated_size()
321        # self.readonly_storage causes remaining_space <= 0
322
323        # fill alreadygot with all shares that we have, not just the ones
324        # they asked about: this will save them a lot of work. Add or update
325        # leases for all of them: if they want us to hold shares for this
326        # file, they'll want us to hold leases for this file.
327        for (shnum, fn) in self.get_shares(storage_index):
328            alreadygot[shnum] = ShareFile(fn)
329        if renew_leases:
330            self._add_or_renew_leases(alreadygot.values(), lease_info)
331
332        for shnum in sharenums:
333            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
334            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
335            if os.path.exists(finalhome):
336                # great! we already have it. easy.
337                pass
338            elif os.path.exists(incominghome):
339                # For Foolscap we don't create BucketWriters for shnums that
340                # have a partial share (in incoming/), so if a second upload
341                # occurs while the first is still in progress, the second
342                # uploader will use different storage servers.
343                pass
344            elif (not limited) or (remaining_space >= max_space_per_bucket):
345                # ok! we need to create the new share file.
346                bw = BucketWriter(self, incominghome, finalhome,
347                                  max_space_per_bucket, lease_info,
348                                  clock=self._clock)
349                if self.no_storage:
350                    # Really this should be done by having a separate class for
351                    # this situation; see
352                    # https://tahoe-lafs.org/trac/tahoe-lafs/ticket/3862
353                    bw.throw_out_all_data = True
354                bucketwriters[shnum] = bw
355                self._bucket_writers[incominghome] = bw
356                if limited:
357                    remaining_space -= max_space_per_bucket
358            else:
359                # bummer! not enough space to accept this bucket
360                pass
361
362        if bucketwriters:
363            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
364
365        self.add_latency("allocate", self._clock.seconds() - start)
366        return set(alreadygot), bucketwriters
367
368    def _iter_share_files(self, storage_index):
369        for shnum, filename in self.get_shares(storage_index):
370            with open(filename, 'rb') as f:
371                header = f.read(32)
372            if MutableShareFile.is_valid_header(header):
373                sf = MutableShareFile(filename, self)
374                # note: if the share has been migrated, the renew_lease()
375                # call will throw an exception, with information to help the
376                # client update the lease.
377            elif ShareFile.is_valid_header(header):
378                sf = ShareFile(filename)
379            else:
380                continue # non-sharefile
381            yield sf
382
383    def add_lease(self, storage_index, renew_secret, cancel_secret, owner_num=1):
384        start = self._clock.seconds()
385        self.count("add-lease")
386        new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
387        lease_info = LeaseInfo(owner_num,
388                               renew_secret, cancel_secret,
389                               new_expire_time, self.my_nodeid)
390        self._add_or_renew_leases(
391            self._iter_share_files(storage_index),
392            lease_info,
393        )
394        self.add_latency("add-lease", self._clock.seconds() - start)
395        return None
396
397    def renew_lease(self, storage_index, renew_secret):
398        start = self._clock.seconds()
399        self.count("renew")
400        new_expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
401        found_buckets = False
402        for sf in self._iter_share_files(storage_index):
403            found_buckets = True
404            sf.renew_lease(renew_secret, new_expire_time)
405        self.add_latency("renew", self._clock.seconds() - start)
406        if not found_buckets:
407            raise IndexError("no such lease to renew")
408
409    def bucket_writer_closed(self, bw, consumed_size):
410        if self.stats_provider:
411            self.stats_provider.count('storage_server.bytes_added', consumed_size)
412        del self._bucket_writers[bw.incominghome]
413        for handler in self._call_on_bucket_writer_close:
414            handler(bw)
415
416    def register_bucket_writer_close_handler(self, handler):
417        """
418        The handler will be called with any ``BucketWriter`` that closes.
419        """
420        self._call_on_bucket_writer_close.append(handler)
421
422    def get_shares(self, storage_index) -> Iterable[tuple[int, str]]:
423        """
424        Return an iterable of (shnum, pathname) tuples for files that hold
425        shares for this storage_index. In each tuple, 'shnum' will always be
426        the integer form of the last component of 'pathname'.
427        """
428        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
429        try:
430            for f in os.listdir(storagedir):
431                if NUM_RE.match(f):
432                    filename = os.path.join(storagedir, f)
433                    yield (int(f), filename)
434        except OSError:
435            # Commonly caused by there being no buckets at all.
436            pass
437
438    def get_buckets(self, storage_index):
439        """
440        Get ``BucketReaders`` for an immutable.
441        """
442        start = self._clock.seconds()
443        self.count("get")
444        si_s = si_b2a(storage_index)
445        log.msg("storage: get_buckets %r" % si_s)
446        bucketreaders = {} # k: sharenum, v: BucketReader
447        for shnum, filename in self.get_shares(storage_index):
448            bucketreaders[shnum] = BucketReader(self, filename,
449                                                storage_index, shnum)
450        self.add_latency("get", self._clock.seconds() - start)
451        return bucketreaders
452
453    def get_leases(self, storage_index):
454        """Provide an iterator that yields all of the leases attached to this
455        bucket. Each lease is returned as a LeaseInfo instance.
456
457        This method is not for client use.
458
459        :note: Only for immutable shares.
460        """
461        # since all shares get the same lease data, we just grab the leases
462        # from the first share
463        try:
464            shnum, filename = next(self.get_shares(storage_index))
465            sf = ShareFile(filename)
466            return sf.get_leases()
467        except StopIteration:
468            return iter([])
469
470    def get_slot_leases(self, storage_index):
471        """
472        This method is not for client use.
473
474        :note: Only for mutable shares.
475
476        :return: An iterable of the leases attached to this slot.
477        """
478        for _, share_filename in self.get_shares(storage_index):
479            share = MutableShareFile(share_filename)
480            return share.get_leases()
481        return []
482
483    def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
484        """
485        Gather up existing mutable shares for the given storage index.
486
487        :param bytes bucketdir: The filesystem path containing shares for the
488            given storage index.
489
490        :param bytes write_enabler: The write enabler secret for the shares.
491
492        :param bytes si_s: The storage index in encoded (base32) form.
493
494        :raise BadWriteEnablerError: If the write enabler is not correct for
495            any of the collected shares.
496
497        :return dict[int, MutableShareFile]: The collected shares in a mapping
498            from integer share numbers to ``MutableShareFile`` instances.
499        """
500        shares = {}
501        if os.path.isdir(bucketdir):
502            # shares exist if there is a file for them
503            for sharenum_s in os.listdir(bucketdir):
504                try:
505                    sharenum = int(sharenum_s)
506                except ValueError:
507                    continue
508                filename = os.path.join(bucketdir, sharenum_s)
509                msf = MutableShareFile(filename, self)
510                msf.check_write_enabler(write_enabler, si_s)
511                shares[sharenum] = msf
512        return shares
513
514    def _evaluate_test_vectors(self, test_and_write_vectors, shares):
515        """
516        Execute test vectors against share data.
517
518        :param test_and_write_vectors: See
519            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
520
521        :param dict[int, MutableShareFile] shares: The shares against which to
522            execute the vectors.
523
524        :return bool: ``True`` if and only if all of the test vectors succeed
525            against the given shares.
526        """
527        for sharenum in test_and_write_vectors:
528            (testv, datav, new_length) = test_and_write_vectors[sharenum]
529            if sharenum in shares:
530                if not shares[sharenum].check_testv(testv):
531                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
532                    return False
533            else:
534                # compare the vectors against an empty share, in which all
535                # reads return empty strings.
536                if not EmptyShare().check_testv(testv):
537                    self.log("testv failed (empty): [%d] %r" % (sharenum,
538                                                                testv))
539                    return False
540        return True
541
542    def _evaluate_read_vectors(self, read_vector, shares):
543        """
544        Execute read vectors against share data.
545
546        :param read_vector: See ``allmydata.interfaces.ReadVector``.
547
548        :param dict[int, MutableShareFile] shares: The shares against which to
549            execute the vector.
550
551        :return dict[int, bytes]: The data read from the shares.
552        """
553        read_data = {}
554        for sharenum, share in shares.items():
555            read_data[sharenum] = share.readv(read_vector)
556        return read_data
557
558    def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
559        """
560        Execute write vectors against share data.
561
562        :param bytes bucketdir: The parent directory holding the shares.  This
563            is removed if the last share is removed from it.  If shares are
564            created, they are created in it.
565
566        :param secrets: A tuple of ``WriteEnablerSecret``,
567            ``LeaseRenewSecret``, and ``LeaseCancelSecret``.  These secrets
568            are used to initialize new shares.
569
570        :param test_and_write_vectors: See
571            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
572
573        :param dict[int, MutableShareFile]: The shares against which to
574            execute the vectors.
575
576        :return dict[int, MutableShareFile]: The shares which still exist
577            after applying the vectors.
578        """
579        remaining_shares = {}
580
581        for sharenum in test_and_write_vectors:
582            (testv, datav, new_length) = test_and_write_vectors[sharenum]
583            if new_length == 0:
584                if sharenum in shares:
585                    shares[sharenum].unlink()
586            else:
587                if sharenum not in shares:
588                    # allocate a new share
589                    share = self._allocate_slot_share(bucketdir, secrets,
590                                                      sharenum,
591                                                      owner_num=0)
592                    shares[sharenum] = share
593                shares[sharenum].writev(datav, new_length)
594                remaining_shares[sharenum] = shares[sharenum]
595
596            if new_length == 0:
597                # delete bucket directories that exist but are empty.  They
598                # might not exist if a client showed up and asked us to
599                # truncate a share we weren't even holding.
600                if os.path.exists(bucketdir) and [] == os.listdir(bucketdir):
601                    os.rmdir(bucketdir)
602        return remaining_shares
603
604    def _make_lease_info(self, renew_secret, cancel_secret):
605        """
606        :return LeaseInfo: Information for a new lease for a share.
607        """
608        ownerid = 1 # TODO
609        expire_time = self._clock.seconds() + DEFAULT_RENEWAL_TIME
610        lease_info = LeaseInfo(ownerid,
611                               renew_secret, cancel_secret,
612                               expire_time, self.my_nodeid)
613        return lease_info
614
615    def _add_or_renew_leases(self, shares, lease_info):
616        """
617        Put the given lease onto the given shares.
618
619        :param Iterable[Union[MutableShareFile, ShareFile]] shares: The shares
620            to put the lease onto.
621
622        :param LeaseInfo lease_info: The lease to put on the shares.
623        """
624        for share in shares:
625            share.add_or_renew_lease(self.get_available_space(), lease_info)
626
627    def slot_testv_and_readv_and_writev(  # type: ignore # warner/foolscap#78
628            self,
629            storage_index,
630            secrets,
631            test_and_write_vectors,
632            read_vector,
633            renew_leases=True,
634    ):
635        """
636        Read data from shares and conditionally write some data to them.
637
638        :param bool renew_leases: If and only if this is ``True`` and the test
639            vectors pass then shares mentioned in ``test_and_write_vectors``
640            that still exist after the changes are made will also have a
641            secret-matching lease renewed (or, if none match, a new lease
642            added).
643
644        See ``allmydata.interfaces.RIStorageServer`` for details about other
645        parameters and return value.
646        """
647        start = self._clock.seconds()
648        self.count("writev")
649        si_s = si_b2a(storage_index)
650        log.msg("storage: slot_writev %r" % si_s)
651        si_dir = storage_index_to_dir(storage_index)
652        (write_enabler, renew_secret, cancel_secret) = secrets
653        bucketdir = os.path.join(self.sharedir, si_dir)
654
655        # If collection succeeds we know the write_enabler is good for all
656        # existing shares.
657        shares = self._collect_mutable_shares_for_storage_index(
658            bucketdir,
659            write_enabler,
660            si_s,
661        )
662
663        # Now evaluate test vectors.
664        testv_is_good = self._evaluate_test_vectors(
665            test_and_write_vectors,
666            shares,
667        )
668
669        # now gather the read vectors, before we do any writes
670        read_data = self._evaluate_read_vectors(
671            read_vector,
672            shares,
673        )
674
675        if testv_is_good:
676            # now apply the write vectors
677            remaining_shares = self._evaluate_write_vectors(
678                bucketdir,
679                secrets,
680                test_and_write_vectors,
681                shares,
682            )
683            if renew_leases:
684                lease_info = self._make_lease_info(renew_secret, cancel_secret)
685                self._add_or_renew_leases(remaining_shares.values(), lease_info)
686
687        # all done
688        self.add_latency("writev", self._clock.seconds() - start)
689        return (testv_is_good, read_data)
690
691    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
692                             owner_num=0):
693        (write_enabler, renew_secret, cancel_secret) = secrets
694        my_nodeid = self.my_nodeid
695        fileutil.make_dirs(bucketdir)
696        filename = os.path.join(bucketdir, "%d" % sharenum)
697        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
698                                         self)
699        return share
700
701    def enumerate_mutable_shares(self, storage_index: bytes) -> set[int]:
702        """Return all share numbers for the given mutable."""
703        si_dir = storage_index_to_dir(storage_index)
704        # shares exist if there is a file for them
705        bucketdir = os.path.join(self.sharedir, si_dir)
706        if not os.path.isdir(bucketdir):
707            return set()
708        result = set()
709        for sharenum_s in os.listdir(bucketdir):
710            try:
711                result.add(int(sharenum_s))
712            except ValueError:
713                continue
714        return result
715
716    def slot_readv(self, storage_index, shares, readv):
717        start = self._clock.seconds()
718        self.count("readv")
719        si_s = si_b2a(storage_index)
720        lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
721                     facility="tahoe.storage", level=log.OPERATIONAL)
722        si_dir = storage_index_to_dir(storage_index)
723        # shares exist if there is a file for them
724        bucketdir = os.path.join(self.sharedir, si_dir)
725        if not os.path.isdir(bucketdir):
726            self.add_latency("readv", self._clock.seconds() - start)
727            return {}
728        datavs = {}
729        for sharenum_s in os.listdir(bucketdir):
730            try:
731                sharenum = int(sharenum_s)
732            except ValueError:
733                continue
734            if sharenum in shares or not shares:
735                filename = os.path.join(bucketdir, sharenum_s)
736                msf = MutableShareFile(filename, self)
737                datavs[sharenum] = msf.readv(readv)
738        log.msg("returning shares %s" % (list(datavs.keys()),),
739                facility="tahoe.storage", level=log.NOISY, parent=lp)
740        self.add_latency("readv", self._clock.seconds() - start)
741        return datavs
742
743    def _share_exists(self, storage_index, shnum):
744        """
745        Check local share storage to see if a matching share exists.
746
747        :param bytes storage_index: The storage index to inspect.
748        :param int shnum: The share number to check for.
749
750        :return bool: ``True`` if a share with the given number exists at the
751            given storage index, ``False`` otherwise.
752        """
753        for existing_sharenum, ignored in self.get_shares(storage_index):
754            if existing_sharenum == shnum:
755                return True
756        return False
757
758    def advise_corrupt_share(self, share_type, storage_index, shnum,
759                             reason):
760        # Previously this had to be bytes for legacy protocol backwards
761        # compatibility reasons. Now that Foolscap layer has been abstracted
762        # out, we can probably refactor this to be unicode...
763        assert isinstance(share_type, bytes)
764        assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
765
766        si_s = si_b2a(storage_index)
767
768        if not self._share_exists(storage_index, shnum):
769            log.msg(
770                format=(
771                    "discarding client corruption claim for %(si)s/%(shnum)d "
772                    "which I do not have"
773                ),
774                si=si_s,
775                shnum=shnum,
776            )
777            return
778
779        log.msg(format=("client claims corruption in (%(share_type)s) " +
780                        "%(si)s-%(shnum)d: %(reason)s"),
781                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
782                level=log.SCARY, umid="SGx2fA")
783
784        report = render_corruption_report(share_type, si_s, shnum, reason)
785        if len(report) > self.get_available_space():
786            return None
787
788        now = time_format.iso_utc(sep="T")
789        report_path = get_corruption_report_path(
790            self.corruption_advisory_dir,
791            now,
792            si_s,
793            shnum,
794        )
795        with open(report_path, "w", encoding="utf-8") as f:
796            f.write(report)
797
798        return None
799
800    def get_immutable_share_length(self, storage_index: bytes, share_number: int) -> int:
801        """Returns the length (in bytes) of an immutable."""
802        si_dir = storage_index_to_dir(storage_index)
803        path = os.path.join(self.sharedir, si_dir, str(share_number))
804        return ShareFile(path).get_length()
805
806    def get_mutable_share_length(self, storage_index: bytes, share_number: int) -> int:
807        """Returns the length (in bytes) of a mutable."""
808        si_dir = storage_index_to_dir(storage_index)
809        path = os.path.join(self.sharedir, si_dir, str(share_number))
810        if not os.path.exists(path):
811            raise KeyError("No such storage index or share number")
812        return MutableShareFile(path).get_length()
813
814
815@implementer(RIStorageServer)
816class FoolscapStorageServer(Referenceable):  # type: ignore # warner/foolscap#78
817    """
818    A filesystem-based implementation of ``RIStorageServer``.
819
820    For Foolscap, BucketWriter lifetime is tied to connection: when
821    disconnection happens, the BucketWriters are removed.
822    """
823    name = 'storage'
824
825    def __init__(self, storage_server):  # type: (StorageServer) -> None
826        self._server = storage_server
827
828        # Canaries and disconnect markers for BucketWriters created via Foolscap:
829        self._bucket_writer_disconnect_markers : dict[BucketWriter, tuple[IRemoteReference, Any]] = {}
830
831        self._server.register_bucket_writer_close_handler(self._bucket_writer_closed)
832
833    def _bucket_writer_closed(self, bw):
834        if bw in self._bucket_writer_disconnect_markers:
835            canary, disconnect_marker = self._bucket_writer_disconnect_markers.pop(bw)
836            canary.dontNotifyOnDisconnect(disconnect_marker)
837
838    def remote_get_version(self):
839        return self._server.get_version()
840
841    def remote_allocate_buckets(self, storage_index,
842                                renew_secret, cancel_secret,
843                                sharenums, allocated_size,
844                                canary, owner_num=0):
845        """Foolscap-specific ``allocate_buckets()`` API."""
846        alreadygot, bucketwriters = self._server.allocate_buckets(
847            storage_index, renew_secret, cancel_secret, sharenums, allocated_size,
848            owner_num=owner_num, renew_leases=True,
849        )
850
851        # Abort BucketWriters if disconnection happens.
852        for bw in bucketwriters.values():
853            disconnect_marker = canary.notifyOnDisconnect(bw.disconnected)
854            self._bucket_writer_disconnect_markers[bw] = (canary, disconnect_marker)
855
856        # Wrap BucketWriters with Foolscap adapter:
857        bucketwriters = {
858            k: FoolscapBucketWriter(bw)
859            for (k, bw) in bucketwriters.items()
860        }
861
862        return alreadygot, bucketwriters
863
864    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
865                         owner_num=1):
866        return self._server.add_lease(storage_index, renew_secret, cancel_secret)
867
868    def remote_renew_lease(self, storage_index, renew_secret):
869        return self._server.renew_lease(storage_index, renew_secret)
870
871    def remote_get_buckets(self, storage_index):
872        return {
873            k: FoolscapBucketReader(bucket)
874            for (k, bucket) in self._server.get_buckets(storage_index).items()
875        }
876
877    def remote_slot_testv_and_readv_and_writev(self, storage_index,
878                                               secrets,
879                                               test_and_write_vectors,
880                                               read_vector):
881        return self._server.slot_testv_and_readv_and_writev(
882            storage_index,
883            secrets,
884            test_and_write_vectors,
885            read_vector,
886            renew_leases=True,
887        )
888
889    def remote_slot_readv(self, storage_index, shares, readv):
890        return self._server.slot_readv(storage_index, shares, readv)
891
892    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
893                                    reason):
894        return self._server.advise_corrupt_share(share_type, storage_index, shnum,
895                                                 reason)
896
897
898CORRUPTION_REPORT_FORMAT = """\
899report: Share Corruption
900type: {type}
901storage_index: {storage_index}
902share_number: {share_number}
903
904{reason}
905
906"""
907
908def render_corruption_report(share_type, si_s, shnum, reason):
909    """
910    Create a string that explains a corruption report using freeform text.
911
912    :param bytes share_type: The type of the share which the report is about.
913
914    :param bytes si_s: The encoded representation of the storage index which
915        the report is about.
916
917    :param int shnum: The share number which the report is about.
918
919    :param bytes reason: The reason given by the client for the corruption
920        report.
921    """
922    return CORRUPTION_REPORT_FORMAT.format(
923        type=bytes_to_native_str(share_type),
924        storage_index=bytes_to_native_str(si_s),
925        share_number=shnum,
926        reason=bytes_to_native_str(reason),
927    )
928
929def get_corruption_report_path(base_dir, now, si_s, shnum):
930    """
931    Determine the path to which a certain corruption report should be written.
932
933    :param str base_dir: The directory beneath which to construct the path.
934
935    :param str now: The time of the report.
936
937    :param str si_s: The encoded representation of the storage index which the
938        report is about.
939
940    :param int shnum: The share number which the report is about.
941
942    :return str: A path to which the report can be written.
943    """
944    # windows can't handle colons in the filename
945    return os.path.join(
946        base_dir,
947        ("%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","")
948    )
Note: See TracBrowser for help on using the repository browser.