Changeset 3e4342ec in trunk
- Timestamp:
- 2010-02-01T06:16:10Z (15 years ago)
- Branches:
- master
- Children:
- 57e3af1
- Parents:
- e4e2599
- Location:
- src/allmydata
- Files:
-
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
TabularUnified src/allmydata/immutable/download.py ¶
re4e2599 r3e4342ec 790 790 self.active_buckets = {} # k: shnum, v: bucket 791 791 self._share_buckets = {} # k: sharenum, v: list of buckets 792 self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets 792 793 # _download_all_segments() will set this to: 794 # self._share_vbuckets = {} # k: shnum, v: set of ValidatedBuckets 795 self._share_vbuckets = None 793 796 794 797 self._fetch_failures = {"uri_extension": 0, "crypttext_hash_tree": 0, } … … 809 812 # self._responses_received = 0 810 813 # self._queries_failed = 0 814 815 # This is solely for the use of unit tests. It will be triggered when 816 # we start downloading shares. 817 self._stage_4_d = defer.Deferred() 811 818 812 819 def pauseProducing(self): … … 939 946 self._wait_for_enough_buckets_d = None 940 947 948 if self._share_vbuckets is not None: 949 vbucket = ValidatedReadBucketProxy(sharenum, b, self._share_hash_tree, self._vup.num_segments, self._vup.block_size, self._vup.share_size) 950 self._share_vbuckets.setdefault(sharenum, set()).add(vbucket) 951 941 952 if self._results: 942 953 if peerid not in self._results.servermap: … … 1089 1100 1090 1101 def _download_all_segments(self, res): 1102 # From now on if new buckets are received then I will notice that 1103 # self._share_vbuckets is not None and generate a vbucket for that new 1104 # bucket and add it in to _share_vbuckets. (We had to wait because we 1105 # didn't have self._vup and self._share_hash_tree earlier. We didn't 1106 # need validated buckets until now -- now that we are ready to download 1107 # shares.) 1108 self._share_vbuckets = {} 1091 1109 for sharenum, buckets in self._share_buckets.iteritems(): 1092 1110 for bucket in buckets: … … 1110 1128 # happening until the consumer is ready for more data. 1111 1129 d.addCallback(self._check_for_pause) 1130 1131 self._stage_4_d.callback(None) 1112 1132 return d 1113 1133 -
TabularUnified src/allmydata/test/no_network.py ¶
re4e2599 r3e4342ec 253 253 def break_server(self, serverid): 254 254 # mark the given server as broken, so it will throw exceptions when 255 # asked to hold a share 255 # asked to hold a share or serve a share 256 256 self.servers_by_id[serverid].broken = True 257 257 258 def hang_server(self, serverid, until=defer.Deferred()): 259 # hang the given server until 'until' fires 260 self.servers_by_id[serverid].hung_until = until 258 def hang_server(self, serverid): 259 # hang the given server 260 ss = self.servers_by_id[serverid] 261 assert ss.hung_until is None 262 ss.hung_until = defer.Deferred() 263 264 def unhang_server(self, serverid): 265 # unhang the given server 266 ss = self.servers_by_id[serverid] 267 assert ss.hung_until is not None 268 ss.hung_until.callback(None) 269 ss.hung_until = None 261 270 262 271 -
TabularUnified src/allmydata/test/test_hung_server.py ¶
re4e2599 r3e4342ec 2 2 import os, shutil 3 3 from twisted.trial import unittest 4 from twisted.internet import defer , reactor4 from twisted.internet import defer 5 5 from allmydata import uri 6 6 from allmydata.util.consumer import download_to_data … … 9 9 from allmydata.storage.common import storage_index_to_dir 10 10 from allmydata.test.no_network import GridTestMixin 11 from allmydata.test.common import ShouldFailMixin 11 from allmydata.test.common import ShouldFailMixin, _corrupt_share_data 12 12 from allmydata.interfaces import NotEnoughSharesError 13 13 … … 25 25 for (id, ss) in servers: 26 26 self.g.hang_server(id, **kwargs) 27 28 def _unhang(self, servers, **kwargs): 29 for (id, ss) in servers: 30 self.g.unhang_server(id, **kwargs) 27 31 28 32 def _delete_all_shares_from(self, servers): … … 31 35 if i_serverid in serverids: 32 36 os.unlink(i_sharefile) 37 38 def _corrupt_all_shares_in(self, servers, corruptor_func): 39 serverids = [id for (id, ss) in servers] 40 for (i_shnum, i_serverid, i_sharefile) in self.shares: 41 if i_serverid in serverids: 42 self._corrupt_share((i_shnum, i_sharefile), corruptor_func) 33 43 34 44 def _copy_all_shares_from(self, from_servers, to_server): … … 53 63 in self.shares) 54 64 65 def _corrupt_share(self, share, corruptor_func): 66 (sharenum, sharefile) = share 67 data = open(sharefile, "rb").read() 68 newdata = corruptor_func(data) 69 os.unlink(sharefile) 70 wf = open(sharefile, "wb") 71 wf.write(newdata) 72 wf.close() 73 55 74 def _set_up(self, mutable, testdir, num_clients=1, num_servers=10): 56 75 self.mutable = mutable … … 81 100 return d 82 101 83 def _ check_download(self):102 def _start_download(self): 84 103 n = self.c0.create_node_from_uri(self.uri) 85 104 if self.mutable: 86 105 d = n.download_best_version() 87 expected_plaintext = mutable_plaintext106 stage_4_d = None # currently we aren't doing any tests which require this for mutable files 88 107 else: 89 108 d = download_to_data(n) 90 expected_plaintext = immutable_plaintext 91 def _got_data(data): 92 self.failUnlessEqual(data, expected_plaintext) 93 d.addCallback(_got_data) 109 stage_4_d = n._downloader._all_downloads.keys()[0]._stage_4_d # too ugly! FIXME 110 return (d, stage_4_d,) 111 112 def _wait_for_data(self, n): 113 if self.mutable: 114 d = n.download_best_version() 115 else: 116 d = download_to_data(n) 117 return d 118 119 def _check(self, resultingdata): 120 if self.mutable: 121 self.failUnlessEqual(resultingdata, mutable_plaintext) 122 else: 123 self.failUnlessEqual(resultingdata, immutable_plaintext) 124 125 def _download_and_check(self): 126 d, stage4d = self._start_download() 127 d.addCallback(self._check) 94 128 return d 95 129 … … 98 132 return self.shouldFail(UnrecoverableFileError, self.basedir, 99 133 "no recoverable versions", 100 self._ check_download)134 self._download_and_check) 101 135 else: 102 136 return self.shouldFail(NotEnoughSharesError, self.basedir, 103 137 "Failed to get enough shareholders", 104 self._ check_download)138 self._download_and_check) 105 139 106 140 … … 109 143 for mutable in [False, True]: 110 144 d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_sanity_check")) 111 d.addCallback(lambda ign: self._ check_download())145 d.addCallback(lambda ign: self._download_and_check()) 112 146 return d 113 147 … … 117 151 d.addCallback(lambda ign: self._set_up(mutable, "test_10_good_copied_share")) 118 152 d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0])) 119 d.addCallback(lambda ign: self._ check_download())153 d.addCallback(lambda ign: self._download_and_check()) 120 154 return d 121 155 … … 125 159 d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_noshares")) 126 160 d.addCallback(lambda ign: self._delete_all_shares_from(self.servers[3:])) 127 d.addCallback(lambda ign: self._ check_download())161 d.addCallback(lambda ign: self._download_and_check()) 128 162 return d 129 163 … … 150 184 d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[2:3], self.servers[0])) 151 185 d.addCallback(lambda ign: self._break(self.servers[2:])) 152 d.addCallback(lambda ign: self._ check_download())186 d.addCallback(lambda ign: self._download_and_check()) 153 187 return d 154 188 … … 169 203 d.addCallback(lambda ign: self._set_up(mutable, "test_3_good_7_hung")) 170 204 d.addCallback(lambda ign: self._hang(self.servers[3:])) 171 d.addCallback(lambda ign: self._ check_download())205 d.addCallback(lambda ign: self._download_and_check()) 172 206 return d 173 207 … … 175 209 d = defer.succeed(None) 176 210 for mutable in [False]: 177 recovered = defer.Deferred()178 211 d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers")) 179 d.addCallback(lambda ign: self._hang(self.servers[2:3] , until=recovered))180 d.addCallback(lambda ign: self._hang(self.servers[3:])) 181 d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None))182 d.addCallback(lambda ign: self._ check_download())212 d.addCallback(lambda ign: self._hang(self.servers[2:3])) 213 d.addCallback(lambda ign: self._hang(self.servers[3:])) 214 d.addCallback(lambda ign: self._unhang(self.servers[2:3])) 215 d.addCallback(lambda ign: self._download_and_check()) 183 216 return d 184 217 … … 186 219 d = defer.succeed(None) 187 220 for mutable in [False]: 188 recovered = defer.Deferred()189 221 d.addCallback(lambda ign: self._set_up(mutable, "test_2_good_8_hung_then_1_recovers_with_2_shares")) 190 222 d.addCallback(lambda ign: self._copy_all_shares_from(self.servers[0:1], self.servers[2])) 191 d.addCallback(lambda ign: self._hang(self.servers[2:3], until=recovered)) 192 d.addCallback(lambda ign: self._hang(self.servers[3:])) 193 d.addCallback(lambda ign: reactor.callLater(5, recovered.callback, None)) 194 d.addCallback(lambda ign: self._check_download()) 195 return d 223 d.addCallback(lambda ign: self._hang(self.servers[2:3])) 224 d.addCallback(lambda ign: self._hang(self.servers[3:])) 225 d.addCallback(lambda ign: self._unhang(self.servers[2:3])) 226 d.addCallback(lambda ign: self._download_and_check()) 227 return d 228 229 def test_failover_during_stage_4(self): 230 # See #287 231 d = defer.succeed(None) 232 for mutable in [False]: 233 d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) 234 d.addCallback(lambda ign: self._corrupt_all_shares_in(self.servers[2:3], _corrupt_share_data)) 235 d.addCallback(lambda ign: self._set_up(mutable, "test_failover_during_stage_4")) 236 d.addCallback(lambda ign: self._hang(self.servers[3:])) 237 d.addCallback(lambda ign: self._start_download()) 238 def _after_starting_download((doned, started4d)): 239 started4d.addCallback(lambda ign: self._unhang(self.servers[3:4])) 240 doned.addCallback(self._check) 241 return doned 242 d.addCallback(_after_starting_download) 243 244 return d
Note: See TracChangeset
for help on using the changeset viewer.