source: trunk/src/allmydata/mutable/publish.py

Last change on this file was 70c0607, checked in by Itamar Turner-Trauring <itamar@…>, at 2021-02-24T19:36:15Z

Enable BytesWarning? across all tests ported to Python 3, fixing problems that caught.

  • Property mode set to 100644
File size: 55.0 KB
Line 
1"""
2Ported to Python 3.
3"""
4from __future__ import division
5from __future__ import absolute_import
6from __future__ import print_function
7from __future__ import unicode_literals
8
9from future.utils import PY2
10if PY2:
11    from future.builtins import filter, map, zip, ascii, chr, hex, input, next, oct, open, pow, round, super, bytes, dict, list, object, range, str, max, min  # noqa: F401
12
13import os, time
14from io import BytesIO
15from itertools import count
16from zope.interface import implementer
17from twisted.internet import defer
18from twisted.python import failure
19
20from allmydata.crypto import aes
21from allmydata.crypto import rsa
22from allmydata.interfaces import IPublishStatus, SDMF_VERSION, MDMF_VERSION, \
23                                 IMutableUploadable
24from allmydata.util import base32, hashutil, mathutil, log
25from allmydata.util.dictutil import DictOfSets
26from allmydata import hashtree, codec
27from allmydata.storage.server import si_b2a
28from foolscap.api import eventually, fireEventually
29from allmydata.mutable.common import MODE_WRITE, MODE_CHECK, MODE_REPAIR, \
30     UncoordinatedWriteError, NotEnoughServersError
31from allmydata.mutable.servermap import ServerMap
32from allmydata.mutable.layout import get_version_from_checkstring,\
33                                     unpack_mdmf_checkstring, \
34                                     unpack_sdmf_checkstring, \
35                                     MDMFSlotWriteProxy, \
36                                     SDMFSlotWriteProxy
37
38KiB = 1024
39DEFAULT_MAX_SEGMENT_SIZE = 128 * KiB
40PUSHING_BLOCKS_STATE = 0
41PUSHING_EVERYTHING_ELSE_STATE = 1
42DONE_STATE = 2
43
44@implementer(IPublishStatus)
45class PublishStatus(object):
46    statusid_counter = count(0)
47    def __init__(self):
48        self.timings = {}
49        self.timings["send_per_server"] = {}
50        self.timings["encrypt"] = 0.0
51        self.timings["encode"] = 0.0
52        self.servermap = None
53        self._problems = {}
54        self.active = True
55        self.storage_index = None
56        self.helper = False
57        self.encoding = ("?", "?")
58        self.size = None
59        self.status = "Not started"
60        self.progress = 0.0
61        self.counter = next(self.statusid_counter)
62        self.started = time.time()
63
64    def add_per_server_time(self, server, elapsed):
65        if server not in self.timings["send_per_server"]:
66            self.timings["send_per_server"][server] = []
67        self.timings["send_per_server"][server].append(elapsed)
68    def accumulate_encode_time(self, elapsed):
69        self.timings["encode"] += elapsed
70    def accumulate_encrypt_time(self, elapsed):
71        self.timings["encrypt"] += elapsed
72
73    def get_started(self):
74        return self.started
75    def get_storage_index(self):
76        return self.storage_index
77    def get_encoding(self):
78        return self.encoding
79    def using_helper(self):
80        return self.helper
81    def get_servermap(self):
82        return self.servermap
83    def get_size(self):
84        return self.size
85    def get_status(self):
86        return self.status
87    def get_progress(self):
88        return self.progress
89    def get_active(self):
90        return self.active
91    def get_counter(self):
92        return self.counter
93    def get_problems(self):
94        return self._problems
95
96    def set_storage_index(self, si):
97        self.storage_index = si
98    def set_helper(self, helper):
99        self.helper = helper
100    def set_servermap(self, servermap):
101        self.servermap = servermap
102    def set_encoding(self, k, n):
103        self.encoding = (k, n)
104    def set_size(self, size):
105        self.size = size
106    def set_status(self, status):
107        self.status = status
108    def set_progress(self, value):
109        self.progress = value
110    def set_active(self, value):
111        self.active = value
112
113class LoopLimitExceededError(Exception):
114    pass
115
116class Publish(object):
117    """I represent a single act of publishing the mutable file to the grid. I
118    will only publish my data if the servermap I am using still represents
119    the current state of the world.
120
121    To make the initial publish, set servermap to None.
122    """
123
124    def __init__(self, filenode, storage_broker, servermap):
125        self._node = filenode
126        self._storage_broker = storage_broker
127        self._servermap = servermap
128        self._storage_index = self._node.get_storage_index()
129        self._log_prefix = prefix = si_b2a(self._storage_index)[:5]
130        num = self.log("Publish(%r): starting" % prefix, parent=None)
131        self._log_number = num
132        self._running = True
133        self._first_write_error = None
134        self._last_failure = None
135
136        self._status = PublishStatus()
137        self._status.set_storage_index(self._storage_index)
138        self._status.set_helper(False)
139        self._status.set_progress(0.0)
140        self._status.set_active(True)
141        self._version = self._node.get_version()
142        assert self._version in (SDMF_VERSION, MDMF_VERSION)
143
144
145    def get_status(self):
146        return self._status
147
148    def log(self, *args, **kwargs):
149        if 'parent' not in kwargs:
150            kwargs['parent'] = self._log_number
151        if "facility" not in kwargs:
152            kwargs["facility"] = "tahoe.mutable.publish"
153        return log.msg(*args, **kwargs)
154
155
156    def update(self, data, offset, blockhashes, version):
157        """
158        I replace the contents of this file with the contents of data,
159        starting at offset. I return a Deferred that fires with None
160        when the replacement has been completed, or with an error if
161        something went wrong during the process.
162
163        Note that this process will not upload new shares. If the file
164        being updated is in need of repair, callers will have to repair
165        it on their own.
166        """
167        # How this works:
168        # 1: Make server assignments. We'll assign each share that we know
169        # about on the grid to that server that currently holds that
170        # share, and will not place any new shares.
171        # 2: Setup encoding parameters. Most of these will stay the same
172        # -- datalength will change, as will some of the offsets.
173        # 3. Upload the new segments.
174        # 4. Be done.
175        assert IMutableUploadable.providedBy(data)
176
177        self.data = data
178
179        # XXX: Use the MutableFileVersion instead.
180        self.datalength = self._node.get_size()
181        if data.get_size() > self.datalength:
182            self.datalength = data.get_size()
183
184        self.log("starting update")
185        self.log("adding new data of length %d at offset %d" % \
186                    (data.get_size(), offset))
187        self.log("new data length is %d" % self.datalength)
188        self._status.set_size(self.datalength)
189        self._status.set_status("Started")
190        self._started = time.time()
191
192        self.done_deferred = defer.Deferred()
193
194        self._writekey = self._node.get_writekey()
195        assert self._writekey, "need write capability to publish"
196
197        # first, which servers will we publish to? We require that the
198        # servermap was updated in MODE_WRITE, so we can depend upon the
199        # serverlist computed by that process instead of computing our own.
200        assert self._servermap
201        assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
202        # we will push a version that is one larger than anything present
203        # in the grid, according to the servermap.
204        self._new_seqnum = self._servermap.highest_seqnum() + 1
205        self._status.set_servermap(self._servermap)
206
207        self.log(format="new seqnum will be %(seqnum)d",
208                 seqnum=self._new_seqnum, level=log.NOISY)
209
210        # We're updating an existing file, so all of the following
211        # should be available.
212        self.readkey = self._node.get_readkey()
213        self.required_shares = self._node.get_required_shares()
214        assert self.required_shares is not None
215        self.total_shares = self._node.get_total_shares()
216        assert self.total_shares is not None
217        self._status.set_encoding(self.required_shares, self.total_shares)
218
219        self._pubkey = self._node.get_pubkey()
220        assert self._pubkey
221        self._privkey = self._node.get_privkey()
222        assert self._privkey
223        self._encprivkey = self._node.get_encprivkey()
224
225        sb = self._storage_broker
226        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
227        self.full_serverlist = full_serverlist # for use later, immutable
228        self.bad_servers = set() # servers who have errbacked/refused requests
229
230        # This will set self.segment_size, self.num_segments, and
231        # self.fec. TODO: Does it know how to do the offset? Probably
232        # not. So do that part next.
233        self.setup_encoding_parameters(offset=offset)
234
235        # if we experience any surprises (writes which were rejected because
236        # our test vector did not match, or shares which we didn't expect to
237        # see), we set this flag and report an UncoordinatedWriteError at the
238        # end of the publish process.
239        self.surprised = False
240
241        # we keep track of three tables. The first is our goal: which share
242        # we want to see on which servers. This is initially populated by the
243        # existing servermap.
244        self.goal = set() # pairs of (server, shnum) tuples
245
246        # the number of outstanding queries: those that are in flight and
247        # may or may not be delivered, accepted, or acknowledged. This is
248        # incremented when a query is sent, and decremented when the response
249        # returns or errbacks.
250        self.num_outstanding = 0
251
252        # the third is a table of successes: share which have actually been
253        # placed. These are populated when responses come back with success.
254        # When self.placed == self.goal, we're done.
255        self.placed = set() # (server, shnum) tuples
256
257        self.bad_share_checkstrings = {}
258
259        # This is set at the last step of the publishing process.
260        self.versioninfo = ""
261
262        # we use the servermap to populate the initial goal: this way we will
263        # try to update each existing share in place. Since we're
264        # updating, we ignore damaged and missing shares -- callers must
265        # do a repair to repair and recreate these.
266        self.goal = set(self._servermap.get_known_shares())
267
268        # shnum -> set of IMutableSlotWriter
269        self.writers = DictOfSets()
270
271        # SDMF files are updated differently.
272        self._version = MDMF_VERSION
273        writer_class = MDMFSlotWriteProxy
274
275        # For each (server, shnum) in self.goal, we make a
276        # write proxy for that server. We'll use this to write
277        # shares to the server.
278        for (server,shnum) in self.goal:
279            write_enabler = self._node.get_write_enabler(server)
280            renew_secret = self._node.get_renewal_secret(server)
281            cancel_secret = self._node.get_cancel_secret(server)
282            secrets = (write_enabler, renew_secret, cancel_secret)
283
284            writer = writer_class(shnum,
285                                  server.get_storage_server(),
286                                  self._storage_index,
287                                  secrets,
288                                  self._new_seqnum,
289                                  self.required_shares,
290                                  self.total_shares,
291                                  self.segment_size,
292                                  self.datalength)
293
294            self.writers.add(shnum, writer)
295            writer.server = server
296            known_shares = self._servermap.get_known_shares()
297            assert (server, shnum) in known_shares
298            old_versionid, old_timestamp = known_shares[(server,shnum)]
299            (old_seqnum, old_root_hash, old_salt, old_segsize,
300             old_datalength, old_k, old_N, old_prefix,
301             old_offsets_tuple) = old_versionid
302            writer.set_checkstring(old_seqnum,
303                                   old_root_hash,
304                                   old_salt)
305
306        # Our remote shares will not have a complete checkstring until
307        # after we are done writing share data and have started to write
308        # blocks. In the meantime, we need to know what to look for when
309        # writing, so that we can detect UncoordinatedWriteErrors.
310        self._checkstring = self._get_some_writer().get_checkstring()
311
312        # Now, we start pushing shares.
313        self._status.timings["setup"] = time.time() - self._started
314        # First, we encrypt, encode, and publish the shares that we need
315        # to encrypt, encode, and publish.
316
317        # Our update process fetched these for us. We need to update
318        # them in place as publishing happens.
319        self.blockhashes = {} # (shnum, [blochashes])
320        for (i, bht) in list(blockhashes.items()):
321            # We need to extract the leaves from our old hash tree.
322            old_segcount = mathutil.div_ceil(version[4],
323                                             version[3])
324            h = hashtree.IncompleteHashTree(old_segcount)
325            bht = dict(enumerate(bht))
326            h.set_hashes(bht)
327            leaves = h[h.get_leaf_index(0):]
328            for j in range(self.num_segments - len(leaves)):
329                leaves.append(None)
330
331            assert len(leaves) >= self.num_segments
332            self.blockhashes[i] = leaves
333            # This list will now be the leaves that were set during the
334            # initial upload + enough empty hashes to make it a
335            # power-of-two. If we exceed a power of two boundary, we
336            # should be encoding the file over again, and should not be
337            # here. So, we have
338            #assert len(self.blockhashes[i]) == \
339            #    hashtree.roundup_pow2(self.num_segments), \
340            #        len(self.blockhashes[i])
341            # XXX: Except this doesn't work. Figure out why.
342
343        # These are filled in later, after we've modified the block hash
344        # tree suitably.
345        self.sharehash_leaves = None # eventually [sharehashes]
346        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
347                              # validate the share]
348
349        self.log("Starting push")
350
351        self._state = PUSHING_BLOCKS_STATE
352        self._push()
353
354        return self.done_deferred
355
356
357    def publish(self, newdata):
358        """Publish the filenode's current contents.  Returns a Deferred that
359        fires (with None) when the publish has done as much work as it's ever
360        going to do, or errbacks with ConsistencyError if it detects a
361        simultaneous write.
362        """
363
364        # 0. Setup encoding parameters, encoder, and other such things.
365        # 1. Encrypt, encode, and publish segments.
366        assert IMutableUploadable.providedBy(newdata)
367
368        self.data = newdata
369        self.datalength = newdata.get_size()
370        #if self.datalength >= DEFAULT_MAX_SEGMENT_SIZE:
371        #    self._version = MDMF_VERSION
372        #else:
373        #    self._version = SDMF_VERSION
374
375        self.log("starting publish, datalen is %s" % self.datalength)
376        self._status.set_size(self.datalength)
377        self._status.set_status("Started")
378        self._started = time.time()
379
380        self.done_deferred = defer.Deferred()
381
382        self._writekey = self._node.get_writekey()
383        assert self._writekey, "need write capability to publish"
384
385        # first, which servers will we publish to? We require that the
386        # servermap was updated in MODE_WRITE, so we can depend upon the
387        # serverlist computed by that process instead of computing our own.
388        if self._servermap:
389            assert self._servermap.get_last_update()[0] in (MODE_WRITE, MODE_CHECK, MODE_REPAIR)
390            # we will push a version that is one larger than anything present
391            # in the grid, according to the servermap.
392            self._new_seqnum = self._servermap.highest_seqnum() + 1
393        else:
394            # If we don't have a servermap, that's because we're doing the
395            # initial publish
396            self._new_seqnum = 1
397            self._servermap = ServerMap()
398        self._status.set_servermap(self._servermap)
399
400        self.log(format="new seqnum will be %(seqnum)d",
401                 seqnum=self._new_seqnum, level=log.NOISY)
402
403        # having an up-to-date servermap (or using a filenode that was just
404        # created for the first time) also guarantees that the following
405        # fields are available
406        self.readkey = self._node.get_readkey()
407        self.required_shares = self._node.get_required_shares()
408        assert self.required_shares is not None
409        self.total_shares = self._node.get_total_shares()
410        assert self.total_shares is not None
411        self._status.set_encoding(self.required_shares, self.total_shares)
412
413        self._pubkey = self._node.get_pubkey()
414        assert self._pubkey
415        self._privkey = self._node.get_privkey()
416        assert self._privkey
417        self._encprivkey = self._node.get_encprivkey()
418
419        sb = self._storage_broker
420        full_serverlist = list(sb.get_servers_for_psi(self._storage_index))
421        self.full_serverlist = full_serverlist # for use later, immutable
422        self.bad_servers = set() # servers who have errbacked/refused requests
423
424        # This will set self.segment_size, self.num_segments, and
425        # self.fec.
426        self.setup_encoding_parameters()
427
428        # if we experience any surprises (writes which were rejected because
429        # our test vector did not match, or shares which we didn't expect to
430        # see), we set this flag and report an UncoordinatedWriteError at the
431        # end of the publish process.
432        self.surprised = False
433
434        # we keep track of three tables. The first is our goal: which share
435        # we want to see on which servers. This is initially populated by the
436        # existing servermap.
437        self.goal = set() # pairs of (server, shnum) tuples
438
439        # the number of outstanding queries: those that are in flight and
440        # may or may not be delivered, accepted, or acknowledged. This is
441        # incremented when a query is sent, and decremented when the response
442        # returns or errbacks.
443        self.num_outstanding = 0
444
445        # the third is a table of successes: share which have actually been
446        # placed. These are populated when responses come back with success.
447        # When self.placed == self.goal, we're done.
448        self.placed = set() # (server, shnum) tuples
449
450        self.bad_share_checkstrings = {}
451
452        # This is set at the last step of the publishing process.
453        self.versioninfo = ""
454
455        # we use the servermap to populate the initial goal: this way we will
456        # try to update each existing share in place.
457        self.goal = set(self._servermap.get_known_shares())
458
459        # then we add in all the shares that were bad (corrupted, bad
460        # signatures, etc). We want to replace these.
461        for key, old_checkstring in list(self._servermap.get_bad_shares().items()):
462            (server, shnum) = key
463            self.goal.add( (server,shnum) )
464            self.bad_share_checkstrings[(server,shnum)] = old_checkstring
465
466        # TODO: Make this part do server selection.
467        self.update_goal()
468
469        # shnum -> set of IMutableSlotWriter
470        self.writers = DictOfSets()
471
472        if self._version == MDMF_VERSION:
473            writer_class = MDMFSlotWriteProxy
474        else:
475            writer_class = SDMFSlotWriteProxy
476
477        # For each (server, shnum) in self.goal, we make a
478        # write proxy for that server. We'll use this to write
479        # shares to the server.
480        for (server,shnum) in self.goal:
481            write_enabler = self._node.get_write_enabler(server)
482            renew_secret = self._node.get_renewal_secret(server)
483            cancel_secret = self._node.get_cancel_secret(server)
484            secrets = (write_enabler, renew_secret, cancel_secret)
485
486            writer =  writer_class(shnum,
487                                   server.get_storage_server(),
488                                   self._storage_index,
489                                   secrets,
490                                   self._new_seqnum,
491                                   self.required_shares,
492                                   self.total_shares,
493                                   self.segment_size,
494                                   self.datalength)
495            self.writers.add(shnum, writer)
496            writer.server = server
497            known_shares = self._servermap.get_known_shares()
498            if (server, shnum) in known_shares:
499                old_versionid, old_timestamp = known_shares[(server,shnum)]
500                (old_seqnum, old_root_hash, old_salt, old_segsize,
501                 old_datalength, old_k, old_N, old_prefix,
502                 old_offsets_tuple) = old_versionid
503                writer.set_checkstring(old_seqnum,
504                                       old_root_hash,
505                                       old_salt)
506            elif (server, shnum) in self.bad_share_checkstrings:
507                old_checkstring = self.bad_share_checkstrings[(server, shnum)]
508                writer.set_checkstring(old_checkstring)
509
510        # Our remote shares will not have a complete checkstring until
511        # after we are done writing share data and have started to write
512        # blocks. In the meantime, we need to know what to look for when
513        # writing, so that we can detect UncoordinatedWriteErrors.
514        self._checkstring = self._get_some_writer().get_checkstring()
515
516        # Now, we start pushing shares.
517        self._status.timings["setup"] = time.time() - self._started
518        # First, we encrypt, encode, and publish the shares that we need
519        # to encrypt, encode, and publish.
520
521        # This will eventually hold the block hash chain for each share
522        # that we publish. We define it this way so that empty publishes
523        # will still have something to write to the remote slot.
524        self.blockhashes = dict([(i, []) for i in range(self.total_shares)])
525        for i in range(self.total_shares):
526            blocks = self.blockhashes[i]
527            for j in range(self.num_segments):
528                blocks.append(None)
529        self.sharehash_leaves = None # eventually [sharehashes]
530        self.sharehashes = {} # shnum -> [sharehash leaves necessary to
531                              # validate the share]
532
533        self.log("Starting push")
534
535        self._state = PUSHING_BLOCKS_STATE
536        self._push()
537
538        return self.done_deferred
539
540    def _get_some_writer(self):
541        return list(list(self.writers.values())[0])[0]
542
543    def _update_status(self):
544        self._status.set_status("Sending Shares: %d placed out of %d, "
545                                "%d messages outstanding" %
546                                (len(self.placed),
547                                 len(self.goal),
548                                 self.num_outstanding))
549        self._status.set_progress(1.0 * len(self.placed) / len(self.goal))
550
551
552    def setup_encoding_parameters(self, offset=0):
553        if self._version == MDMF_VERSION:
554            segment_size = DEFAULT_MAX_SEGMENT_SIZE # 128 KiB by default
555        else:
556            segment_size = self.datalength # SDMF is only one segment
557        # this must be a multiple of self.required_shares
558        segment_size = mathutil.next_multiple(segment_size,
559                                              self.required_shares)
560        self.segment_size = segment_size
561
562        # Calculate the starting segment for the upload.
563        if segment_size:
564            # We use div_ceil instead of integer division here because
565            # it is semantically correct.
566            # If datalength isn't an even multiple of segment_size, but
567            # is larger than segment_size, datalength // segment_size
568            # will be the largest number such that num <= datalength and
569            # num % segment_size == 0. But that's not what we want,
570            # because it ignores the extra data. div_ceil will give us
571            # the right number of segments for the data that we're
572            # given.
573            self.num_segments = mathutil.div_ceil(self.datalength,
574                                                  segment_size)
575
576            self.starting_segment = offset // segment_size
577
578        else:
579            self.num_segments = 0
580            self.starting_segment = 0
581
582
583        self.log("building encoding parameters for file")
584        self.log("got segsize %d" % self.segment_size)
585        self.log("got %d segments" % self.num_segments)
586
587        if self._version == SDMF_VERSION:
588            assert self.num_segments in (0, 1) # SDMF
589        # calculate the tail segment size.
590
591        if segment_size and self.datalength:
592            self.tail_segment_size = self.datalength % segment_size
593            self.log("got tail segment size %d" % self.tail_segment_size)
594        else:
595            self.tail_segment_size = 0
596
597        if self.tail_segment_size == 0 and segment_size:
598            # The tail segment is the same size as the other segments.
599            self.tail_segment_size = segment_size
600
601        # Make FEC encoders
602        fec = codec.CRSEncoder()
603        fec.set_params(self.segment_size,
604                       self.required_shares, self.total_shares)
605        self.piece_size = fec.get_block_size()
606        self.fec = fec
607
608        if self.tail_segment_size == self.segment_size:
609            self.tail_fec = self.fec
610        else:
611            tail_fec = codec.CRSEncoder()
612            tail_fec.set_params(self.tail_segment_size,
613                                self.required_shares,
614                                self.total_shares)
615            self.tail_fec = tail_fec
616
617        self._current_segment = self.starting_segment
618        self.end_segment = self.num_segments - 1
619        # Now figure out where the last segment should be.
620        if self.data.get_size() != self.datalength:
621            # We're updating a few segments in the middle of a mutable
622            # file, so we don't want to republish the whole thing.
623            # (we don't have enough data to do that even if we wanted
624            # to)
625            end = self.data.get_size()
626            self.end_segment = end // segment_size
627            if end % segment_size == 0:
628                self.end_segment -= 1
629
630        self.log("got start segment %d" % self.starting_segment)
631        self.log("got end segment %d" % self.end_segment)
632
633
634    def _push(self, ignored=None):
635        """
636        I manage state transitions. In particular, I see that we still
637        have a good enough number of writers to complete the upload
638        successfully.
639        """
640        # Can we still successfully publish this file?
641        # TODO: Keep track of outstanding queries before aborting the
642        #       process.
643        num_shnums = len(self.writers)
644        if num_shnums < self.required_shares or self.surprised:
645            return self._failure()
646
647        # Figure out what we need to do next. Each of these needs to
648        # return a deferred so that we don't block execution when this
649        # is first called in the upload method.
650        if self._state == PUSHING_BLOCKS_STATE:
651            return self.push_segment(self._current_segment)
652
653        elif self._state == PUSHING_EVERYTHING_ELSE_STATE:
654            return self.push_everything_else()
655
656        # If we make it to this point, we were successful in placing the
657        # file.
658        return self._done()
659
660
661    def push_segment(self, segnum):
662        if self.num_segments == 0 and self._version == SDMF_VERSION:
663            self._add_dummy_salts()
664
665        if segnum > self.end_segment:
666            # We don't have any more segments to push.
667            self._state = PUSHING_EVERYTHING_ELSE_STATE
668            return self._push()
669
670        d = self._encode_segment(segnum)
671        d.addCallback(self._push_segment, segnum)
672        def _increment_segnum(ign):
673            self._current_segment += 1
674        # XXX: I don't think we need to do addBoth here -- any errBacks
675        # should be handled within push_segment.
676        d.addCallback(_increment_segnum)
677        d.addCallback(self._turn_barrier)
678        d.addCallback(self._push)
679        d.addErrback(self._failure)
680
681
682    def _turn_barrier(self, result):
683        """
684        I help the publish process avoid the recursion limit issues
685        described in #237.
686        """
687        return fireEventually(result)
688
689
690    def _add_dummy_salts(self):
691        """
692        SDMF files need a salt even if they're empty, or the signature
693        won't make sense. This method adds a dummy salt to each of our
694        SDMF writers so that they can write the signature later.
695        """
696        salt = os.urandom(16)
697        assert self._version == SDMF_VERSION
698
699        for shnum, writers in self.writers.items():
700            for writer in writers:
701                writer.put_salt(salt)
702
703
704    def _encode_segment(self, segnum):
705        """
706        I encrypt and encode the segment segnum.
707        """
708        started = time.time()
709
710        if segnum + 1 == self.num_segments:
711            segsize = self.tail_segment_size
712        else:
713            segsize = self.segment_size
714
715
716        self.log("Pushing segment %d of %d" % (segnum + 1, self.num_segments))
717        data = self.data.read(segsize)
718        if not isinstance(data, bytes):
719            # XXX: Why does this return a list?
720            data = b"".join(data)
721
722        assert len(data) == segsize, len(data)
723
724        salt = os.urandom(16)
725
726        key = hashutil.ssk_readkey_data_hash(salt, self.readkey)
727        self._status.set_status("Encrypting")
728        encryptor = aes.create_encryptor(key)
729        crypttext = aes.encrypt_data(encryptor, data)
730        assert len(crypttext) == len(data)
731
732        now = time.time()
733        self._status.accumulate_encrypt_time(now - started)
734        started = now
735
736        # now apply FEC
737        if segnum + 1 == self.num_segments:
738            fec = self.tail_fec
739        else:
740            fec = self.fec
741
742        self._status.set_status("Encoding")
743        crypttext_pieces = [None] * self.required_shares
744        piece_size = fec.get_block_size()
745        for i in range(len(crypttext_pieces)):
746            offset = i * piece_size
747            piece = crypttext[offset:offset+piece_size]
748            piece = piece + b"\x00"*(piece_size - len(piece)) # padding
749            crypttext_pieces[i] = piece
750            assert len(piece) == piece_size
751        d = fec.encode(crypttext_pieces)
752        def _done_encoding(res):
753            elapsed = time.time() - started
754            self._status.accumulate_encode_time(elapsed)
755            return (res, salt)
756        d.addCallback(_done_encoding)
757        return d
758
759
760    def _push_segment(self, encoded_and_salt, segnum):
761        """
762        I push (data, salt) as segment number segnum.
763        """
764        results, salt = encoded_and_salt
765        shares, shareids = results
766        self._status.set_status("Pushing segment")
767        for i in range(len(shares)):
768            sharedata = shares[i]
769            shareid = shareids[i]
770            if self._version == MDMF_VERSION:
771                hashed = salt + sharedata
772            else:
773                hashed = sharedata
774            block_hash = hashutil.block_hash(hashed)
775            self.blockhashes[shareid][segnum] = block_hash
776            # find the writer for this share
777            writers = self.writers[shareid]
778            for writer in writers:
779                writer.put_block(sharedata, segnum, salt)
780
781
782    def push_everything_else(self):
783        """
784        I put everything else associated with a share.
785        """
786        self._pack_started = time.time()
787        self.push_encprivkey()
788        self.push_blockhashes()
789        self.push_sharehashes()
790        self.push_toplevel_hashes_and_signature()
791        d = self.finish_publishing()
792        def _change_state(ignored):
793            self._state = DONE_STATE
794        d.addCallback(_change_state)
795        d.addCallback(self._push)
796        return d
797
798
799    def push_encprivkey(self):
800        encprivkey = self._encprivkey
801        self._status.set_status("Pushing encrypted private key")
802        for shnum, writers in self.writers.items():
803            for writer in writers:
804                writer.put_encprivkey(encprivkey)
805
806
807    def push_blockhashes(self):
808        self.sharehash_leaves = [None] * len(self.blockhashes)
809        self._status.set_status("Building and pushing block hash tree")
810        for shnum, blockhashes in list(self.blockhashes.items()):
811            t = hashtree.HashTree(blockhashes)
812            self.blockhashes[shnum] = list(t)
813            # set the leaf for future use.
814            self.sharehash_leaves[shnum] = t[0]
815
816            writers = self.writers[shnum]
817            for writer in writers:
818                writer.put_blockhashes(self.blockhashes[shnum])
819
820
821    def push_sharehashes(self):
822        self._status.set_status("Building and pushing share hash chain")
823        share_hash_tree = hashtree.HashTree(self.sharehash_leaves)
824        for shnum in range(len(self.sharehash_leaves)):
825            needed_indices = share_hash_tree.needed_hashes(shnum)
826            self.sharehashes[shnum] = dict( [ (i, share_hash_tree[i])
827                                             for i in needed_indices] )
828            writers = self.writers[shnum]
829            for writer in writers:
830                writer.put_sharehashes(self.sharehashes[shnum])
831        self.root_hash = share_hash_tree[0]
832
833
834    def push_toplevel_hashes_and_signature(self):
835        # We need to to three things here:
836        #   - Push the root hash and salt hash
837        #   - Get the checkstring of the resulting layout; sign that.
838        #   - Push the signature
839        self._status.set_status("Pushing root hashes and signature")
840        for shnum in range(self.total_shares):
841            writers = self.writers[shnum]
842            for writer in writers:
843                writer.put_root_hash(self.root_hash)
844        self._update_checkstring()
845        self._make_and_place_signature()
846
847
848    def _update_checkstring(self):
849        """
850        After putting the root hash, MDMF files will have the
851        checkstring written to the storage server. This means that we
852        can update our copy of the checkstring so we can detect
853        uncoordinated writes. SDMF files will have the same checkstring,
854        so we need not do anything.
855        """
856        self._checkstring = self._get_some_writer().get_checkstring()
857
858
859    def _make_and_place_signature(self):
860        """
861        I create and place the signature.
862        """
863        started = time.time()
864        self._status.set_status("Signing prefix")
865        signable = self._get_some_writer().get_signable()
866        self.signature = rsa.sign_data(self._privkey, signable)
867
868        for (shnum, writers) in self.writers.items():
869            for writer in writers:
870                writer.put_signature(self.signature)
871        self._status.timings['sign'] = time.time() - started
872
873
874    def finish_publishing(self):
875        # We're almost done -- we just need to put the verification key
876        # and the offsets
877        started = time.time()
878        self._status.set_status("Pushing shares")
879        self._started_pushing = started
880        ds = []
881        verification_key = rsa.der_string_from_verifying_key(self._pubkey)
882
883        for (shnum, writers) in list(self.writers.copy().items()):
884            for writer in writers:
885                writer.put_verification_key(verification_key)
886                self.num_outstanding += 1
887                def _no_longer_outstanding(res):
888                    self.num_outstanding -= 1
889                    return res
890
891                d = writer.finish_publishing()
892                d.addBoth(_no_longer_outstanding)
893                d.addErrback(self._connection_problem, writer)
894                d.addCallback(self._got_write_answer, writer, started)
895                ds.append(d)
896        self._record_verinfo()
897        self._status.timings['pack'] = time.time() - started
898        return defer.DeferredList(ds)
899
900
901    def _record_verinfo(self):
902        self.versioninfo = self._get_some_writer().get_verinfo()
903
904
905    def _connection_problem(self, f, writer):
906        """
907        We ran into a connection problem while working with writer, and
908        need to deal with that.
909        """
910        self.log("found problem: %s" % str(f))
911        self._last_failure = f
912        self.writers.discard(writer.shnum, writer)
913
914
915    def log_goal(self, goal, message=""):
916        logmsg = [message]
917        for (shnum, server) in sorted([(s,p) for (p,s) in goal], key=lambda t: (id(t[0]), id(t[1]))):
918            logmsg.append("sh%d to [%r]" % (shnum, server.get_name()))
919        self.log("current goal: %s" % (", ".join(logmsg)), level=log.NOISY)
920        self.log("we are planning to push new seqnum=#%d" % self._new_seqnum,
921                 level=log.NOISY)
922
923    def update_goal(self):
924        self.log_goal(self.goal, "before update: ")
925
926        # first, remove any bad servers from our goal
927        self.goal = set([ (server, shnum)
928                          for (server, shnum) in self.goal
929                          if server not in self.bad_servers ])
930
931        # find the homeless shares:
932        homefull_shares = set([shnum for (server, shnum) in self.goal])
933        homeless_shares = set(range(self.total_shares)) - homefull_shares
934        homeless_shares = sorted(list(homeless_shares))
935        # place them somewhere. We prefer unused servers at the beginning of
936        # the available server list.
937
938        if not homeless_shares:
939            return
940
941        # if an old share X is on a node, put the new share X there too.
942        # TODO: 1: redistribute shares to achieve one-per-server, by copying
943        #       shares from existing servers to new (less-crowded) ones. The
944        #       old shares must still be updated.
945        # TODO: 2: move those shares instead of copying them, to reduce future
946        #       update work
947
948        # this is a bit CPU intensive but easy to analyze. We create a sort
949        # order for each server. If the server is marked as bad, we don't
950        # even put them in the list. Then we care about the number of shares
951        # which have already been assigned to them. After that we care about
952        # their permutation order.
953        old_assignments = DictOfSets()
954        for (server, shnum) in self.goal:
955            old_assignments.add(server, shnum)
956
957        serverlist = []
958        for i, server in enumerate(self.full_serverlist):
959            serverid = server.get_serverid()
960            if server in self.bad_servers:
961                continue
962            entry = (len(old_assignments.get(server, [])), i, serverid, server)
963            serverlist.append(entry)
964        serverlist.sort()
965
966        if not serverlist:
967            raise NotEnoughServersError("Ran out of non-bad servers, "
968                                        "first_error=%s" %
969                                        str(self._first_write_error),
970                                        self._first_write_error)
971
972        # we then index this serverlist with an integer, because we may have
973        # to wrap. We update the goal as we go.
974        i = 0
975        for shnum in homeless_shares:
976            (ignored1, ignored2, ignored3, server) = serverlist[i]
977            # if we are forced to send a share to a server that already has
978            # one, we may have two write requests in flight, and the
979            # servermap (which was computed before either request was sent)
980            # won't reflect the new shares, so the second response will be
981            # surprising. There is code in _got_write_answer() to tolerate
982            # this, otherwise it would cause the publish to fail with an
983            # UncoordinatedWriteError. See #546 for details of the trouble
984            # this used to cause.
985            self.goal.add( (server, shnum) )
986            i += 1
987            if i >= len(serverlist):
988                i = 0
989        self.log_goal(self.goal, "after update: ")
990
991
992    def _got_write_answer(self, answer, writer, started):
993        if not answer:
994            # SDMF writers only pretend to write when readers set their
995            # blocks, salts, and so on -- they actually just write once,
996            # at the end of the upload process. In fake writes, they
997            # return defer.succeed(None). If we see that, we shouldn't
998            # bother checking it.
999            return
1000
1001        server = writer.server
1002        lp = self.log("_got_write_answer from %r, share %d" %
1003                      (server.get_name(), writer.shnum))
1004
1005        now = time.time()
1006        elapsed = now - started
1007
1008        self._status.add_per_server_time(server, elapsed)
1009
1010        wrote, read_data = answer
1011
1012        surprise_shares = set(read_data.keys()) - set([writer.shnum])
1013
1014        # We need to remove from surprise_shares any shares that we are
1015        # knowingly also writing to that server from other writers.
1016
1017        # TODO: Precompute this.
1018        shares = []
1019        for shnum, writers in self.writers.items():
1020            shares.extend([x.shnum for x in writers if x.server == server])
1021        known_shnums = set(shares)
1022        surprise_shares -= known_shnums
1023        self.log("found the following surprise shares: %s" %
1024                 str(surprise_shares))
1025
1026        # Now surprise shares contains all of the shares that we did not
1027        # expect to be there.
1028
1029        surprised = False
1030        for shnum in surprise_shares:
1031            # read_data is a dict mapping shnum to checkstring (SIGNED_PREFIX)
1032            checkstring = read_data[shnum][0]
1033            # What we want to do here is to see if their (seqnum,
1034            # roothash, salt) is the same as our (seqnum, roothash,
1035            # salt), or the equivalent for MDMF. The best way to do this
1036            # is to store a packed representation of our checkstring
1037            # somewhere, then not bother unpacking the other
1038            # checkstring.
1039            if checkstring == self._checkstring:
1040                # they have the right share, somehow
1041
1042                if (server,shnum) in self.goal:
1043                    # and we want them to have it, so we probably sent them a
1044                    # copy in an earlier write. This is ok, and avoids the
1045                    # #546 problem.
1046                    continue
1047
1048                # They aren't in our goal, but they are still for the right
1049                # version. Somebody else wrote them, and it's a convergent
1050                # uncoordinated write. Pretend this is ok (don't be
1051                # surprised), since I suspect there's a decent chance that
1052                # we'll hit this in normal operation.
1053                continue
1054
1055            else:
1056                # the new shares are of a different version
1057                if server in self._servermap.get_reachable_servers():
1058                    # we asked them about their shares, so we had knowledge
1059                    # of what they used to have. Any surprising shares must
1060                    # have come from someone else, so UCW.
1061                    surprised = True
1062                else:
1063                    # we didn't ask them, and now we've discovered that they
1064                    # have a share we didn't know about. This indicates that
1065                    # mapupdate should have wokred harder and asked more
1066                    # servers before concluding that it knew about them all.
1067
1068                    # signal UCW, but make sure to ask this server next time,
1069                    # so we'll remember to update it if/when we retry.
1070                    surprised = True
1071                    # TODO: ask this server next time. I don't yet have a good
1072                    # way to do this. Two insufficient possibilities are:
1073                    #
1074                    # self._servermap.add_new_share(server, shnum, verinfo, now)
1075                    #  but that requires fetching/validating/parsing the whole
1076                    #  version string, and all we have is the checkstring
1077                    # self._servermap.mark_bad_share(server, shnum, checkstring)
1078                    #  that will make publish overwrite the share next time,
1079                    #  but it won't re-query the server, and it won't make
1080                    #  mapupdate search further
1081
1082                    # TODO later: when publish starts, do
1083                    # servermap.get_best_version(), extract the seqnum,
1084                    # subtract one, and store as highest-replaceable-seqnum.
1085                    # Then, if this surprise-because-we-didn't-ask share is
1086                    # of highest-replaceable-seqnum or lower, we're allowed
1087                    # to replace it: send out a new writev (or rather add it
1088                    # to self.goal and loop).
1089
1090                surprised = True
1091
1092        if surprised:
1093            self.log("they had shares %s that we didn't know about" %
1094                     (list(surprise_shares),),
1095                     parent=lp, level=log.WEIRD, umid="un9CSQ")
1096            self.surprised = True
1097
1098        if not wrote:
1099            # TODO: there are two possibilities. The first is that the server
1100            # is full (or just doesn't want to give us any room), which means
1101            # we shouldn't ask them again, but is *not* an indication of an
1102            # uncoordinated write. The second is that our testv failed, which
1103            # *does* indicate an uncoordinated write. We currently don't have
1104            # a way to tell these two apart (in fact, the storage server code
1105            # doesn't have the option of refusing our share).
1106            #
1107            # If the server is full, mark the server as bad (so we don't ask
1108            # them again), but don't set self.surprised. The loop() will find
1109            # a new server.
1110            #
1111            # If the testv failed, log it, set self.surprised, but don't
1112            # bother adding to self.bad_servers .
1113
1114            self.log("our testv failed, so the write did not happen",
1115                     parent=lp, level=log.WEIRD, umid="8sc26g")
1116            self.surprised = True
1117            self.bad_servers.add(server) # don't ask them again
1118            # use the checkstring to add information to the log message
1119            unknown_format = False
1120            for (shnum,readv) in list(read_data.items()):
1121                checkstring = readv[0]
1122                version = get_version_from_checkstring(checkstring)
1123                if version == MDMF_VERSION:
1124                    (other_seqnum,
1125                     other_roothash) = unpack_mdmf_checkstring(checkstring)
1126                elif version == SDMF_VERSION:
1127                    (other_seqnum,
1128                     other_roothash,
1129                     other_IV) = unpack_sdmf_checkstring(checkstring)
1130                else:
1131                    unknown_format = True
1132                expected_version = self._servermap.version_on_server(server,
1133                                                                     shnum)
1134                if expected_version:
1135                    (seqnum, root_hash, IV, segsize, datalength, k, N, prefix,
1136                     offsets_tuple) = expected_version
1137                    msg = ("somebody modified the share on us:"
1138                           " shnum=%d: I thought they had #%d:R=%r," %
1139                           (shnum,
1140                            seqnum, base32.b2a(root_hash)[:4]))
1141                    if unknown_format:
1142                        msg += (" but I don't know how to read share"
1143                                " format %d" % version)
1144                    else:
1145                        msg += " but testv reported #%d:R=%r" % \
1146                               (other_seqnum, base32.b2a(other_roothash)[:4])
1147                    self.log(msg, parent=lp, level=log.NOISY)
1148                # if expected_version==None, then we didn't expect to see a
1149                # share on that server, and the 'surprise_shares' clause
1150                # above will have logged it.
1151            return
1152
1153        # and update the servermap
1154        # self.versioninfo is set during the last phase of publishing.
1155        # If we get there, we know that responses correspond to placed
1156        # shares, and can safely execute these statements.
1157        if self.versioninfo:
1158            self.log("wrote successfully: adding new share to servermap")
1159            self._servermap.add_new_share(server, writer.shnum,
1160                                          self.versioninfo, started)
1161            self.placed.add( (server, writer.shnum) )
1162        self._update_status()
1163        # the next method in the deferred chain will check to see if
1164        # we're done and successful.
1165        return
1166
1167
1168    def _done(self):
1169        if not self._running:
1170            return
1171        self._running = False
1172        now = time.time()
1173        self._status.timings["total"] = now - self._started
1174
1175        elapsed = now - self._started_pushing
1176        self._status.timings['push'] = elapsed
1177
1178        self._status.set_active(False)
1179        self.log("Publish done, success")
1180        self._status.set_status("Finished")
1181        self._status.set_progress(1.0)
1182        # Get k and segsize, then give them to the caller.
1183        hints = {}
1184        hints['segsize'] = self.segment_size
1185        hints['k'] = self.required_shares
1186        self._node.set_downloader_hints(hints)
1187        eventually(self.done_deferred.callback, None)
1188
1189    def _failure(self, f=None):
1190        if f:
1191            self._last_failure = f
1192
1193        if not self.surprised:
1194            # We ran out of servers
1195            msg = "Publish ran out of good servers"
1196            if self._last_failure:
1197                msg += ", last failure was: %s" % str(self._last_failure)
1198            self.log(msg)
1199            e = NotEnoughServersError(msg)
1200
1201        else:
1202            # We ran into shares that we didn't recognize, which means
1203            # that we need to return an UncoordinatedWriteError.
1204            self.log("Publish failed with UncoordinatedWriteError")
1205            e = UncoordinatedWriteError()
1206        f = failure.Failure(e)
1207        eventually(self.done_deferred.callback, f)
1208
1209
1210@implementer(IMutableUploadable)
1211class MutableFileHandle(object):
1212    """
1213    I am a mutable uploadable built around a filehandle-like object,
1214    usually either a BytesIO instance or a handle to an actual file.
1215    """
1216
1217    def __init__(self, filehandle):
1218        # The filehandle is defined as a generally file-like object that
1219        # has these two methods. We don't care beyond that.
1220        assert hasattr(filehandle, "read")
1221        assert hasattr(filehandle, "close")
1222
1223        self._filehandle = filehandle
1224        # We must start reading at the beginning of the file, or we risk
1225        # encountering errors when the data read does not match the size
1226        # reported to the uploader.
1227        self._filehandle.seek(0)
1228
1229        # We have not yet read anything, so our position is 0.
1230        self._marker = 0
1231
1232
1233    def get_size(self):
1234        """
1235        I return the amount of data in my filehandle.
1236        """
1237        if not hasattr(self, "_size"):
1238            old_position = self._filehandle.tell()
1239            # Seek to the end of the file by seeking 0 bytes from the
1240            # file's end
1241            self._filehandle.seek(0, os.SEEK_END)
1242            self._size = self._filehandle.tell()
1243            # Restore the previous position, in case this was called
1244            # after a read.
1245            self._filehandle.seek(old_position)
1246            assert self._filehandle.tell() == old_position
1247
1248        assert hasattr(self, "_size")
1249        return self._size
1250
1251
1252    def pos(self):
1253        """
1254        I return the position of my read marker -- i.e., how much data I
1255        have already read and returned to callers.
1256        """
1257        return self._marker
1258
1259
1260    def read(self, length):
1261        """
1262        I return some data (up to length bytes) from my filehandle.
1263
1264        In most cases, I return length bytes, but sometimes I won't --
1265        for example, if I am asked to read beyond the end of a file, or
1266        an error occurs.
1267        """
1268        results = self._filehandle.read(length)
1269        self._marker += len(results)
1270        return [results]
1271
1272
1273    def close(self):
1274        """
1275        I close the underlying filehandle. Any further operations on the
1276        filehandle fail at this point.
1277        """
1278        self._filehandle.close()
1279
1280
1281class MutableData(MutableFileHandle):
1282    """
1283    I am a mutable uploadable built around a string, which I then cast
1284    into a BytesIO and treat as a filehandle.
1285    """
1286
1287    def __init__(self, s):
1288        # Take a string and return a file-like uploadable.
1289        assert isinstance(s, bytes)
1290
1291        MutableFileHandle.__init__(self, BytesIO(s))
1292
1293
1294@implementer(IMutableUploadable)
1295class TransformingUploadable(object):
1296    """
1297    I am an IMutableUploadable that wraps another IMutableUploadable,
1298    and some segments that are already on the grid. When I am called to
1299    read, I handle merging of boundary segments.
1300    """
1301
1302
1303    def __init__(self, data, offset, segment_size, start, end):
1304        assert IMutableUploadable.providedBy(data)
1305
1306        self._newdata = data
1307        self._offset = offset
1308        self._segment_size = segment_size
1309        self._start = start
1310        self._end = end
1311
1312        self._read_marker = 0
1313
1314        self._first_segment_offset = offset % segment_size
1315
1316        num = self.log("TransformingUploadable: starting", parent=None)
1317        self._log_number = num
1318        self.log("got fso: %d" % self._first_segment_offset)
1319        self.log("got offset: %d" % self._offset)
1320
1321
1322    def log(self, *args, **kwargs):
1323        if 'parent' not in kwargs:
1324            kwargs['parent'] = self._log_number
1325        if "facility" not in kwargs:
1326            kwargs["facility"] = "tahoe.mutable.transforminguploadable"
1327        return log.msg(*args, **kwargs)
1328
1329
1330    def get_size(self):
1331        return self._offset + self._newdata.get_size()
1332
1333
1334    def read(self, length):
1335        # We can get data from 3 sources here.
1336        #   1. The first of the segments provided to us.
1337        #   2. The data that we're replacing things with.
1338        #   3. The last of the segments provided to us.
1339
1340        # are we in state 0?
1341        self.log("reading %d bytes" % length)
1342
1343        old_start_data = b""
1344        old_data_length = self._first_segment_offset - self._read_marker
1345        if old_data_length > 0:
1346            if old_data_length > length:
1347                old_data_length = length
1348            self.log("returning %d bytes of old start data" % old_data_length)
1349
1350            old_data_end = old_data_length + self._read_marker
1351            old_start_data = self._start[self._read_marker:old_data_end]
1352            length -= old_data_length
1353        else:
1354            # otherwise calculations later get screwed up.
1355            old_data_length = 0
1356
1357        # Is there enough new data to satisfy this read? If not, we need
1358        # to pad the end of the data with data from our last segment.
1359        old_end_length = length - \
1360            (self._newdata.get_size() - self._newdata.pos())
1361        old_end_data = b""
1362        if old_end_length > 0:
1363            self.log("reading %d bytes of old end data" % old_end_length)
1364
1365            # TODO: We're not explicitly checking for tail segment size
1366            # here. Is that a problem?
1367            old_data_offset = (length - old_end_length + \
1368                               old_data_length) % self._segment_size
1369            self.log("reading at offset %d" % old_data_offset)
1370            old_end = old_data_offset + old_end_length
1371            old_end_data = self._end[old_data_offset:old_end]
1372            length -= old_end_length
1373            assert length == self._newdata.get_size() - self._newdata.pos()
1374
1375        self.log("reading %d bytes of new data" % length)
1376        new_data = self._newdata.read(length)
1377        new_data = b"".join(new_data)
1378
1379        self._read_marker += len(old_start_data + new_data + old_end_data)
1380
1381        return old_start_data + new_data + old_end_data
1382
1383    def close(self):
1384        pass
Note: See TracBrowser for help on using the repository browser.