source: trunk/src/allmydata/storage/server.py

Last change on this file was c5a426b, checked in by Itamar Turner-Trauring <itamar@…>, at 2021-02-12T16:47:11Z

More unicode-of-bytes fixes.

  • Property mode set to 100644
File size: 29.7 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import division
5from __future__ import absolute_import
6from __future__ import print_function
7from __future__ import unicode_literals
8
9from future.utils import bytes_to_native_str, PY2
10if PY2:
11    # Omit open() to get native behavior where open("w") always accepts native
12    # strings. Omit bytes so we don't leak future's custom bytes.
13    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, pow, round, super, dict, list, object, range, str, max, min  # noqa: F401
14
15
16import os, re, struct, time
17import weakref
18import six
19
20from foolscap.api import Referenceable
21from twisted.application import service
22
23from zope.interface import implementer
24from allmydata.interfaces import RIStorageServer, IStatsProducer
25from allmydata.util import fileutil, idlib, log, time_format
26import allmydata # for __full_version__
27
28from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir
29_pyflakes_hush = [si_b2a, si_a2b, storage_index_to_dir] # re-exported
30from allmydata.storage.lease import LeaseInfo
31from allmydata.storage.mutable import MutableShareFile, EmptyShare, \
32     create_mutable_sharefile
33from allmydata.mutable.layout import MAX_MUTABLE_SHARE_SIZE
34from allmydata.storage.immutable import ShareFile, BucketWriter, BucketReader
35from allmydata.storage.crawler import BucketCountingCrawler
36from allmydata.storage.expirer import LeaseCheckingCrawler
37
38# storage/
39# storage/shares/incoming
40#   incoming/ holds temp dirs named $START/$STORAGEINDEX/$SHARENUM which will
41#   be moved to storage/shares/$START/$STORAGEINDEX/$SHARENUM upon success
42# storage/shares/$START/$STORAGEINDEX
43# storage/shares/$START/$STORAGEINDEX/$SHARENUM
44
45# Where "$START" denotes the first 10 bits worth of $STORAGEINDEX (that's 2
46# base-32 chars).
47
48# $SHARENUM matches this regex:
49NUM_RE=re.compile("^[0-9]+$")
50
51
52
53@implementer(RIStorageServer, IStatsProducer)
54class StorageServer(service.MultiService, Referenceable):
55    name = 'storage'
56    LeaseCheckerClass = LeaseCheckingCrawler
57
58    def __init__(self, storedir, nodeid, reserved_space=0,
59                 discard_storage=False, readonly_storage=False,
60                 stats_provider=None,
61                 expiration_enabled=False,
62                 expiration_mode="age",
63                 expiration_override_lease_duration=None,
64                 expiration_cutoff_date=None,
65                 expiration_sharetypes=("mutable", "immutable")):
66        service.MultiService.__init__(self)
67        assert isinstance(nodeid, bytes)
68        assert len(nodeid) == 20
69        assert isinstance(nodeid, bytes)
70        self.my_nodeid = nodeid
71        self.storedir = storedir
72        sharedir = os.path.join(storedir, "shares")
73        fileutil.make_dirs(sharedir)
74        self.sharedir = sharedir
75        # we don't actually create the corruption-advisory dir until necessary
76        self.corruption_advisory_dir = os.path.join(storedir,
77                                                    "corruption-advisories")
78        self.reserved_space = int(reserved_space)
79        self.no_storage = discard_storage
80        self.readonly_storage = readonly_storage
81        self.stats_provider = stats_provider
82        if self.stats_provider:
83            self.stats_provider.register_producer(self)
84        self.incomingdir = os.path.join(sharedir, 'incoming')
85        self._clean_incomplete()
86        fileutil.make_dirs(self.incomingdir)
87        self._active_writers = weakref.WeakKeyDictionary()
88        log.msg("StorageServer created", facility="tahoe.storage")
89
90        if reserved_space:
91            if self.get_available_space() is None:
92                log.msg("warning: [storage]reserved_space= is set, but this platform does not support an API to get disk statistics (statvfs(2) or GetDiskFreeSpaceEx), so this reservation cannot be honored",
93                        umin="0wZ27w", level=log.UNUSUAL)
94
95        self.latencies = {"allocate": [], # immutable
96                          "write": [],
97                          "close": [],
98                          "read": [],
99                          "get": [],
100                          "writev": [], # mutable
101                          "readv": [],
102                          "add-lease": [], # both
103                          "renew": [],
104                          "cancel": [],
105                          }
106        self.add_bucket_counter()
107
108        statefile = os.path.join(self.storedir, "lease_checker.state")
109        historyfile = os.path.join(self.storedir, "lease_checker.history")
110        klass = self.LeaseCheckerClass
111        self.lease_checker = klass(self, statefile, historyfile,
112                                   expiration_enabled, expiration_mode,
113                                   expiration_override_lease_duration,
114                                   expiration_cutoff_date,
115                                   expiration_sharetypes)
116        self.lease_checker.setServiceParent(self)
117
118    def __repr__(self):
119        return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),)
120
121    def have_shares(self):
122        # quick test to decide if we need to commit to an implicit
123        # permutation-seed or if we should use a new one
124        return bool(set(os.listdir(self.sharedir)) - set(["incoming"]))
125
126    def add_bucket_counter(self):
127        statefile = os.path.join(self.storedir, "bucket_counter.state")
128        self.bucket_counter = BucketCountingCrawler(self, statefile)
129        self.bucket_counter.setServiceParent(self)
130
131    def count(self, name, delta=1):
132        if self.stats_provider:
133            self.stats_provider.count("storage_server." + name, delta)
134
135    def add_latency(self, category, latency):
136        a = self.latencies[category]
137        a.append(latency)
138        if len(a) > 1000:
139            self.latencies[category] = a[-1000:]
140
141    def get_latencies(self):
142        """Return a dict, indexed by category, that contains a dict of
143        latency numbers for each category. If there are sufficient samples
144        for unambiguous interpretation, each dict will contain the
145        following keys: mean, 01_0_percentile, 10_0_percentile,
146        50_0_percentile (median), 90_0_percentile, 95_0_percentile,
147        99_0_percentile, 99_9_percentile.  If there are insufficient
148        samples for a given percentile to be interpreted unambiguously
149        that percentile will be reported as None. If no samples have been
150        collected for the given category, then that category name will
151        not be present in the return value. """
152        # note that Amazon's Dynamo paper says they use 99.9% percentile.
153        output = {}
154        for category in self.latencies:
155            if not self.latencies[category]:
156                continue
157            stats = {}
158            samples = self.latencies[category][:]
159            count = len(samples)
160            stats["samplesize"] = count
161            samples.sort()
162            if count > 1:
163                stats["mean"] = sum(samples) / count
164            else:
165                stats["mean"] = None
166
167            orderstatlist = [(0.01, "01_0_percentile", 100), (0.1, "10_0_percentile", 10),\
168                             (0.50, "50_0_percentile", 10), (0.90, "90_0_percentile", 10),\
169                             (0.95, "95_0_percentile", 20), (0.99, "99_0_percentile", 100),\
170                             (0.999, "99_9_percentile", 1000)]
171
172            for percentile, percentilestring, minnumtoobserve in orderstatlist:
173                if count >= minnumtoobserve:
174                    stats[percentilestring] = samples[int(percentile*count)]
175                else:
176                    stats[percentilestring] = None
177
178            output[category] = stats
179        return output
180
181    def log(self, *args, **kwargs):
182        if "facility" not in kwargs:
183            kwargs["facility"] = "tahoe.storage"
184        return log.msg(*args, **kwargs)
185
186    def _clean_incomplete(self):
187        fileutil.rm_dir(self.incomingdir)
188
189    def get_stats(self):
190        # remember: RIStatsProvider requires that our return dict
191        # contains numeric values.
192        stats = { 'storage_server.allocated': self.allocated_size(), }
193        stats['storage_server.reserved_space'] = self.reserved_space
194        for category,ld in self.get_latencies().items():
195            for name,v in ld.items():
196                stats['storage_server.latencies.%s.%s' % (category, name)] = v
197
198        try:
199            disk = fileutil.get_disk_stats(self.sharedir, self.reserved_space)
200            writeable = disk['avail'] > 0
201
202            # spacetime predictors should use disk_avail / (d(disk_used)/dt)
203            stats['storage_server.disk_total'] = disk['total']
204            stats['storage_server.disk_used'] = disk['used']
205            stats['storage_server.disk_free_for_root'] = disk['free_for_root']
206            stats['storage_server.disk_free_for_nonroot'] = disk['free_for_nonroot']
207            stats['storage_server.disk_avail'] = disk['avail']
208        except AttributeError:
209            writeable = True
210        except EnvironmentError:
211            log.msg("OS call to get disk statistics failed", level=log.UNUSUAL)
212            writeable = False
213
214        if self.readonly_storage:
215            stats['storage_server.disk_avail'] = 0
216            writeable = False
217
218        stats['storage_server.accepting_immutable_shares'] = int(writeable)
219        s = self.bucket_counter.get_state()
220        bucket_count = s.get("last-complete-bucket-count")
221        if bucket_count:
222            stats['storage_server.total_bucket_count'] = bucket_count
223        return stats
224
225    def get_available_space(self):
226        """Returns available space for share storage in bytes, or None if no
227        API to get this information is available."""
228
229        if self.readonly_storage:
230            return 0
231        return fileutil.get_available_space(self.sharedir, self.reserved_space)
232
233    def allocated_size(self):
234        space = 0
235        for bw in self._active_writers:
236            space += bw.allocated_size()
237        return space
238
239    def remote_get_version(self):
240        remaining_space = self.get_available_space()
241        if remaining_space is None:
242            # We're on a platform that has no API to get disk stats.
243            remaining_space = 2**64
244
245        # Unicode strings might be nicer, but for now sticking to bytes since
246        # this is what the wire protocol has always been.
247        version = { b"http://allmydata.org/tahoe/protocols/storage/v1" :
248                    { b"maximum-immutable-share-size": remaining_space,
249                      b"maximum-mutable-share-size": MAX_MUTABLE_SHARE_SIZE,
250                      b"available-space": remaining_space,
251                      b"tolerates-immutable-read-overrun": True,
252                      b"delete-mutable-shares-with-zero-length-writev": True,
253                      b"fills-holes-with-zero-bytes": True,
254                      b"prevents-read-past-end-of-share-data": True,
255                      },
256                    b"application-version": allmydata.__full_version__.encode("utf-8"),
257                    }
258        return version
259
260    def remote_allocate_buckets(self, storage_index,
261                                renew_secret, cancel_secret,
262                                sharenums, allocated_size,
263                                canary, owner_num=0):
264        # owner_num is not for clients to set, but rather it should be
265        # curried into the PersonalStorageServer instance that is dedicated
266        # to a particular owner.
267        start = time.time()
268        self.count("allocate")
269        alreadygot = set()
270        bucketwriters = {} # k: shnum, v: BucketWriter
271        si_dir = storage_index_to_dir(storage_index)
272        si_s = si_b2a(storage_index)
273
274        log.msg("storage: allocate_buckets %r" % si_s)
275
276        # in this implementation, the lease information (including secrets)
277        # goes into the share files themselves. It could also be put into a
278        # separate database. Note that the lease should not be added until
279        # the BucketWriter has been closed.
280        expire_time = time.time() + 31*24*60*60
281        lease_info = LeaseInfo(owner_num,
282                               renew_secret, cancel_secret,
283                               expire_time, self.my_nodeid)
284
285        max_space_per_bucket = allocated_size
286
287        remaining_space = self.get_available_space()
288        limited = remaining_space is not None
289        if limited:
290            # this is a bit conservative, since some of this allocated_size()
291            # has already been written to disk, where it will show up in
292            # get_available_space.
293            remaining_space -= self.allocated_size()
294        # self.readonly_storage causes remaining_space <= 0
295
296        # fill alreadygot with all shares that we have, not just the ones
297        # they asked about: this will save them a lot of work. Add or update
298        # leases for all of them: if they want us to hold shares for this
299        # file, they'll want us to hold leases for this file.
300        for (shnum, fn) in self._get_bucket_shares(storage_index):
301            alreadygot.add(shnum)
302            sf = ShareFile(fn)
303            sf.add_or_renew_lease(lease_info)
304
305        for shnum in sharenums:
306            incominghome = os.path.join(self.incomingdir, si_dir, "%d" % shnum)
307            finalhome = os.path.join(self.sharedir, si_dir, "%d" % shnum)
308            if os.path.exists(finalhome):
309                # great! we already have it. easy.
310                pass
311            elif os.path.exists(incominghome):
312                # Note that we don't create BucketWriters for shnums that
313                # have a partial share (in incoming/), so if a second upload
314                # occurs while the first is still in progress, the second
315                # uploader will use different storage servers.
316                pass
317            elif (not limited) or (remaining_space >= max_space_per_bucket):
318                # ok! we need to create the new share file.
319                bw = BucketWriter(self, incominghome, finalhome,
320                                  max_space_per_bucket, lease_info, canary)
321                if self.no_storage:
322                    bw.throw_out_all_data = True
323                bucketwriters[shnum] = bw
324                self._active_writers[bw] = 1
325                if limited:
326                    remaining_space -= max_space_per_bucket
327            else:
328                # bummer! not enough space to accept this bucket
329                pass
330
331        if bucketwriters:
332            fileutil.make_dirs(os.path.join(self.sharedir, si_dir))
333
334        self.add_latency("allocate", time.time() - start)
335        return alreadygot, bucketwriters
336
337    def _iter_share_files(self, storage_index):
338        for shnum, filename in self._get_bucket_shares(storage_index):
339            with open(filename, 'rb') as f:
340                header = f.read(32)
341            if header[:32] == MutableShareFile.MAGIC:
342                sf = MutableShareFile(filename, self)
343                # note: if the share has been migrated, the renew_lease()
344                # call will throw an exception, with information to help the
345                # client update the lease.
346            elif header[:4] == struct.pack(">L", 1):
347                sf = ShareFile(filename)
348            else:
349                continue # non-sharefile
350            yield sf
351
352    def remote_add_lease(self, storage_index, renew_secret, cancel_secret,
353                         owner_num=1):
354        start = time.time()
355        self.count("add-lease")
356        new_expire_time = time.time() + 31*24*60*60
357        lease_info = LeaseInfo(owner_num,
358                               renew_secret, cancel_secret,
359                               new_expire_time, self.my_nodeid)
360        for sf in self._iter_share_files(storage_index):
361            sf.add_or_renew_lease(lease_info)
362        self.add_latency("add-lease", time.time() - start)
363        return None
364
365    def remote_renew_lease(self, storage_index, renew_secret):
366        start = time.time()
367        self.count("renew")
368        new_expire_time = time.time() + 31*24*60*60
369        found_buckets = False
370        for sf in self._iter_share_files(storage_index):
371            found_buckets = True
372            sf.renew_lease(renew_secret, new_expire_time)
373        self.add_latency("renew", time.time() - start)
374        if not found_buckets:
375            raise IndexError("no such lease to renew")
376
377    def bucket_writer_closed(self, bw, consumed_size):
378        if self.stats_provider:
379            self.stats_provider.count('storage_server.bytes_added', consumed_size)
380        del self._active_writers[bw]
381
382    def _get_bucket_shares(self, storage_index):
383        """Return a list of (shnum, pathname) tuples for files that hold
384        shares for this storage_index. In each tuple, 'shnum' will always be
385        the integer form of the last component of 'pathname'."""
386        storagedir = os.path.join(self.sharedir, storage_index_to_dir(storage_index))
387        try:
388            for f in os.listdir(storagedir):
389                if NUM_RE.match(f):
390                    filename = os.path.join(storagedir, f)
391                    yield (int(f), filename)
392        except OSError:
393            # Commonly caused by there being no buckets at all.
394            pass
395
396    def remote_get_buckets(self, storage_index):
397        start = time.time()
398        self.count("get")
399        si_s = si_b2a(storage_index)
400        log.msg("storage: get_buckets %r" % si_s)
401        bucketreaders = {} # k: sharenum, v: BucketReader
402        for shnum, filename in self._get_bucket_shares(storage_index):
403            bucketreaders[shnum] = BucketReader(self, filename,
404                                                storage_index, shnum)
405        self.add_latency("get", time.time() - start)
406        return bucketreaders
407
408    def get_leases(self, storage_index):
409        """Provide an iterator that yields all of the leases attached to this
410        bucket. Each lease is returned as a LeaseInfo instance.
411
412        This method is not for client use.
413
414        :note: Only for immutable shares.
415        """
416        # since all shares get the same lease data, we just grab the leases
417        # from the first share
418        try:
419            shnum, filename = next(self._get_bucket_shares(storage_index))
420            sf = ShareFile(filename)
421            return sf.get_leases()
422        except StopIteration:
423            return iter([])
424
425    def get_slot_leases(self, storage_index):
426        """
427        This method is not for client use.
428
429        :note: Only for mutable shares.
430
431        :return: An iterable of the leases attached to this slot.
432        """
433        for _, share_filename in self._get_bucket_shares(storage_index):
434            share = MutableShareFile(share_filename)
435            return share.get_leases()
436        return []
437
438    def _collect_mutable_shares_for_storage_index(self, bucketdir, write_enabler, si_s):
439        """
440        Gather up existing mutable shares for the given storage index.
441
442        :param bytes bucketdir: The filesystem path containing shares for the
443            given storage index.
444
445        :param bytes write_enabler: The write enabler secret for the shares.
446
447        :param bytes si_s: The storage index in encoded (base32) form.
448
449        :raise BadWriteEnablerError: If the write enabler is not correct for
450            any of the collected shares.
451
452        :return dict[int, MutableShareFile]: The collected shares in a mapping
453            from integer share numbers to ``MutableShareFile`` instances.
454        """
455        shares = {}
456        if os.path.isdir(bucketdir):
457            # shares exist if there is a file for them
458            for sharenum_s in os.listdir(bucketdir):
459                try:
460                    sharenum = int(sharenum_s)
461                except ValueError:
462                    continue
463                filename = os.path.join(bucketdir, sharenum_s)
464                msf = MutableShareFile(filename, self)
465                msf.check_write_enabler(write_enabler, si_s)
466                shares[sharenum] = msf
467        return shares
468
469    def _evaluate_test_vectors(self, test_and_write_vectors, shares):
470        """
471        Execute test vectors against share data.
472
473        :param test_and_write_vectors: See
474            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
475
476        :param dict[int, MutableShareFile] shares: The shares against which to
477            execute the vectors.
478
479        :return bool: ``True`` if and only if all of the test vectors succeed
480            against the given shares.
481        """
482        for sharenum in test_and_write_vectors:
483            (testv, datav, new_length) = test_and_write_vectors[sharenum]
484            if sharenum in shares:
485                if not shares[sharenum].check_testv(testv):
486                    self.log("testv failed: [%d]: %r" % (sharenum, testv))
487                    return False
488            else:
489                # compare the vectors against an empty share, in which all
490                # reads return empty strings.
491                if not EmptyShare().check_testv(testv):
492                    self.log("testv failed (empty): [%d] %r" % (sharenum,
493                                                                testv))
494                    return False
495        return True
496
497    def _evaluate_read_vectors(self, read_vector, shares):
498        """
499        Execute read vectors against share data.
500
501        :param read_vector: See ``allmydata.interfaces.ReadVector``.
502
503        :param dict[int, MutableShareFile] shares: The shares against which to
504            execute the vector.
505
506        :return dict[int, bytes]: The data read from the shares.
507        """
508        read_data = {}
509        for sharenum, share in shares.items():
510            read_data[sharenum] = share.readv(read_vector)
511        return read_data
512
513    def _evaluate_write_vectors(self, bucketdir, secrets, test_and_write_vectors, shares):
514        """
515        Execute write vectors against share data.
516
517        :param bytes bucketdir: The parent directory holding the shares.  This
518            is removed if the last share is removed from it.  If shares are
519            created, they are created in it.
520
521        :param secrets: A tuple of ``WriteEnablerSecret``,
522            ``LeaseRenewSecret``, and ``LeaseCancelSecret``.  These secrets
523            are used to initialize new shares.
524
525        :param test_and_write_vectors: See
526            ``allmydata.interfaces.TestAndWriteVectorsForShares``.
527
528        :param dict[int, MutableShareFile]: The shares against which to
529            execute the vectors.
530
531        :return dict[int, MutableShareFile]: The shares which still exist
532            after applying the vectors.
533        """
534        remaining_shares = {}
535
536        for sharenum in test_and_write_vectors:
537            (testv, datav, new_length) = test_and_write_vectors[sharenum]
538            if new_length == 0:
539                if sharenum in shares:
540                    shares[sharenum].unlink()
541            else:
542                if sharenum not in shares:
543                    # allocate a new share
544                    allocated_size = 2000 # arbitrary, really
545                    share = self._allocate_slot_share(bucketdir, secrets,
546                                                      sharenum,
547                                                      allocated_size,
548                                                      owner_num=0)
549                    shares[sharenum] = share
550                shares[sharenum].writev(datav, new_length)
551                remaining_shares[sharenum] = shares[sharenum]
552
553            if new_length == 0:
554                # delete bucket directories that exist but are empty.  They
555                # might not exist if a client showed up and asked us to
556                # truncate a share we weren't even holding.
557                if os.path.exists(bucketdir) and [] == os.listdir(bucketdir):
558                    os.rmdir(bucketdir)
559        return remaining_shares
560
561    def _make_lease_info(self, renew_secret, cancel_secret):
562        """
563        :return LeaseInfo: Information for a new lease for a share.
564        """
565        ownerid = 1 # TODO
566        expire_time = time.time() + 31*24*60*60   # one month
567        lease_info = LeaseInfo(ownerid,
568                               renew_secret, cancel_secret,
569                               expire_time, self.my_nodeid)
570        return lease_info
571
572    def _add_or_renew_leases(self, shares, lease_info):
573        """
574        Put the given lease onto the given shares.
575
576        :param dict[int, MutableShareFile] shares: The shares to put the lease
577            onto.
578
579        :param LeaseInfo lease_info: The lease to put on the shares.
580        """
581        for share in six.viewvalues(shares):
582            share.add_or_renew_lease(lease_info)
583
584    def slot_testv_and_readv_and_writev(  # type: ignore # warner/foolscap#78
585            self,
586            storage_index,
587            secrets,
588            test_and_write_vectors,
589            read_vector,
590            renew_leases,
591    ):
592        """
593        Read data from shares and conditionally write some data to them.
594
595        :param bool renew_leases: If and only if this is ``True`` and the test
596            vectors pass then shares in this slot will also have an updated
597            lease applied to them.
598
599        See ``allmydata.interfaces.RIStorageServer`` for details about other
600        parameters and return value.
601        """
602        start = time.time()
603        self.count("writev")
604        si_s = si_b2a(storage_index)
605        log.msg("storage: slot_writev %r" % si_s)
606        si_dir = storage_index_to_dir(storage_index)
607        (write_enabler, renew_secret, cancel_secret) = secrets
608        bucketdir = os.path.join(self.sharedir, si_dir)
609
610        # If collection succeeds we know the write_enabler is good for all
611        # existing shares.
612        shares = self._collect_mutable_shares_for_storage_index(
613            bucketdir,
614            write_enabler,
615            si_s,
616        )
617
618        # Now evaluate test vectors.
619        testv_is_good = self._evaluate_test_vectors(
620            test_and_write_vectors,
621            shares,
622        )
623
624        # now gather the read vectors, before we do any writes
625        read_data = self._evaluate_read_vectors(
626            read_vector,
627            shares,
628        )
629
630        if testv_is_good:
631            # now apply the write vectors
632            remaining_shares = self._evaluate_write_vectors(
633                bucketdir,
634                secrets,
635                test_and_write_vectors,
636                shares,
637            )
638            if renew_leases:
639                lease_info = self._make_lease_info(renew_secret, cancel_secret)
640                self._add_or_renew_leases(remaining_shares, lease_info)
641
642        # all done
643        self.add_latency("writev", time.time() - start)
644        return (testv_is_good, read_data)
645
646    def remote_slot_testv_and_readv_and_writev(self, storage_index,
647                                               secrets,
648                                               test_and_write_vectors,
649                                               read_vector):
650        return self.slot_testv_and_readv_and_writev(
651            storage_index,
652            secrets,
653            test_and_write_vectors,
654            read_vector,
655            renew_leases=True,
656        )
657
658    def _allocate_slot_share(self, bucketdir, secrets, sharenum,
659                             allocated_size, owner_num=0):
660        (write_enabler, renew_secret, cancel_secret) = secrets
661        my_nodeid = self.my_nodeid
662        fileutil.make_dirs(bucketdir)
663        filename = os.path.join(bucketdir, "%d" % sharenum)
664        share = create_mutable_sharefile(filename, my_nodeid, write_enabler,
665                                         self)
666        return share
667
668    def remote_slot_readv(self, storage_index, shares, readv):
669        start = time.time()
670        self.count("readv")
671        si_s = si_b2a(storage_index)
672        lp = log.msg("storage: slot_readv %r %r" % (si_s, shares),
673                     facility="tahoe.storage", level=log.OPERATIONAL)
674        si_dir = storage_index_to_dir(storage_index)
675        # shares exist if there is a file for them
676        bucketdir = os.path.join(self.sharedir, si_dir)
677        if not os.path.isdir(bucketdir):
678            self.add_latency("readv", time.time() - start)
679            return {}
680        datavs = {}
681        for sharenum_s in os.listdir(bucketdir):
682            try:
683                sharenum = int(sharenum_s)
684            except ValueError:
685                continue
686            if sharenum in shares or not shares:
687                filename = os.path.join(bucketdir, sharenum_s)
688                msf = MutableShareFile(filename, self)
689                datavs[sharenum] = msf.readv(readv)
690        log.msg("returning shares %s" % (list(datavs.keys()),),
691                facility="tahoe.storage", level=log.NOISY, parent=lp)
692        self.add_latency("readv", time.time() - start)
693        return datavs
694
695    def remote_advise_corrupt_share(self, share_type, storage_index, shnum,
696                                    reason):
697        # This is a remote API, I believe, so this has to be bytes for legacy
698        # protocol backwards compatibility reasons.
699        assert isinstance(share_type, bytes)
700        assert isinstance(reason, bytes), "%r is not bytes" % (reason,)
701        fileutil.make_dirs(self.corruption_advisory_dir)
702        now = time_format.iso_utc(sep="T")
703        si_s = si_b2a(storage_index)
704        # windows can't handle colons in the filename
705        fn = os.path.join(self.corruption_advisory_dir,
706                          "%s--%s-%d" % (now, str(si_s, "utf-8"), shnum)).replace(":","")
707        with open(fn, "w") as f:
708            f.write("report: Share Corruption\n")
709            f.write("type: %s\n" % bytes_to_native_str(share_type))
710            f.write("storage_index: %s\n" % bytes_to_native_str(si_s))
711            f.write("share_number: %d\n" % shnum)
712            f.write("\n")
713            f.write(bytes_to_native_str(reason))
714            f.write("\n")
715        log.msg(format=("client claims corruption in (%(share_type)s) " +
716                        "%(si)s-%(shnum)d: %(reason)s"),
717                share_type=share_type, si=si_s, shnum=shnum, reason=reason,
718                level=log.SCARY, umid="SGx2fA")
719        return None
Note: See TracBrowser for help on using the repository browser.