1 | Sun Jul 18 22:47:44 MDT 2010 zooko@zooko.com |
---|
2 | * trivial: rename and add in-line doc to clarify "used_peers" => "upload_servers" |
---|
3 | |
---|
4 | Sun Jul 18 22:49:48 MDT 2010 zooko@zooko.com |
---|
5 | * benchmarking: update bench_dirnode to be correct and use the shiniest new pyutil.benchutil features concerning what units you measure in |
---|
6 | |
---|
7 | Mon Jul 19 02:20:00 MDT 2010 zooko@zooko.com |
---|
8 | * immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer |
---|
9 | |
---|
10 | New patches: |
---|
11 | |
---|
12 | [trivial: rename and add in-line doc to clarify "used_peers" => "upload_servers" |
---|
13 | zooko@zooko.com**20100719044744 |
---|
14 | Ignore-this: 93c42081676e0dea181e55187cfc506d |
---|
15 | ] { |
---|
16 | hunk ./src/allmydata/immutable/upload.py 178 |
---|
17 | """ |
---|
18 | @return: (used_peers, already_peers), where used_peers is a set of |
---|
19 | PeerTracker instances that have agreed to hold some shares |
---|
20 | - for us (the shnum is stashed inside the PeerTracker), |
---|
21 | + for us (the shareids are stashed inside the PeerTracker), |
---|
22 | and already_peers is a dict mapping shnum to a set of peers |
---|
23 | which claim to already have the share. |
---|
24 | """ |
---|
25 | hunk ./src/allmydata/immutable/upload.py 913 |
---|
26 | |
---|
27 | def set_shareholders(self, (used_peers, already_peers), encoder): |
---|
28 | """ |
---|
29 | - @param used_peers: a sequence of PeerTracker objects |
---|
30 | + @param used_peers: a sequence of PeerTracker objects that have agreed to hold some shares for us (the shareids are stashed inside the PeerTracker) |
---|
31 | @paran already_peers: a dict mapping sharenum to a set of peerids |
---|
32 | that claim to already have this share |
---|
33 | """ |
---|
34 | replace ./src/allmydata/immutable/upload.py [A-Za-z_0-9] used_peers upload_servers |
---|
35 | replace ./src/allmydata/test/test_upload.py [A-Za-z_0-9] used_peers upload_servers |
---|
36 | replace ./src/allmydata/util/happinessutil.py [A-Za-z_0-9] used_peers upload_servers |
---|
37 | } |
---|
38 | [benchmarking: update bench_dirnode to be correct and use the shiniest new pyutil.benchutil features concerning what units you measure in |
---|
39 | zooko@zooko.com**20100719044948 |
---|
40 | Ignore-this: b72059e4ff921741b490e6b47ec687c6 |
---|
41 | ] { |
---|
42 | hunk ./src/allmydata/test/bench_dirnode.py 38 |
---|
43 | def create_from_cap(self, writecap, readcap=None, deep_immutable=False, name=''): |
---|
44 | return FakeNode() |
---|
45 | |
---|
46 | -def random_unicode(l): |
---|
47 | - while True: |
---|
48 | +def random_unicode(n=140, b=3, codec='utf-8'): |
---|
49 | + l = [] |
---|
50 | + while len(l) < n: |
---|
51 | try: |
---|
52 | hunk ./src/allmydata/test/bench_dirnode.py 42 |
---|
53 | - return os.urandom(l).decode('utf-8') |
---|
54 | + u = os.urandom(b).decode(codec)[0] |
---|
55 | except UnicodeDecodeError: |
---|
56 | pass |
---|
57 | hunk ./src/allmydata/test/bench_dirnode.py 45 |
---|
58 | + else: |
---|
59 | + l.append(u) |
---|
60 | + return u''.join(l) |
---|
61 | |
---|
62 | encoding_parameters = {"k": 3, "n": 10} |
---|
63 | def random_metadata(): |
---|
64 | hunk ./src/allmydata/test/bench_dirnode.py 52 |
---|
65 | d = {} |
---|
66 | - d['ctime'] = random.random() |
---|
67 | - d['mtime'] = random.random() |
---|
68 | d['tahoe'] = {} |
---|
69 | d['tahoe']['linkcrtime'] = random.random() |
---|
70 | d['tahoe']['linkmotime'] = random.random() |
---|
71 | hunk ./src/allmydata/test/bench_dirnode.py 93 |
---|
72 | |
---|
73 | def init_for_pack(self, N): |
---|
74 | for i in xrange(len(self.children), N): |
---|
75 | - name = random_unicode(random.randrange(1, 9)) |
---|
76 | + name = random_unicode(random.randrange(0, 10)) |
---|
77 | self.children.append( (name, self.random_child()) ) |
---|
78 | |
---|
79 | def init_for_unpack(self, N): |
---|
80 | hunk ./src/allmydata/test/bench_dirnode.py 114 |
---|
81 | (self.init_for_pack, self.pack), |
---|
82 | (self.init_for_unpack, self.unpack_and_repack)]: |
---|
83 | print "benchmarking %s" % (func,) |
---|
84 | - benchutil.bench(self.unpack_and_repack, initfunc=self.init_for_unpack, |
---|
85 | - TOPXP=12)#, profile=profile, profresults=PROF_FILE_NAME) |
---|
86 | + for N in 16, 512, 2048, 16384: |
---|
87 | + print "%5d" % N, |
---|
88 | + benchutil.rep_bench(func, N, initfunc=initfunc, MAXREPS=20, UNITS_PER_SECOND=1000) |
---|
89 | + benchutil.print_bench_footer(UNITS_PER_SECOND=1000) |
---|
90 | + print "(milliseconds)" |
---|
91 | |
---|
92 | def prof_benchmarks(self): |
---|
93 | # This requires pyutil >= v1.3.34. |
---|
94 | } |
---|
95 | [immutable: use PrefixingLogMixin to organize logging in Tahoe2PeerSelector and add more detailed messages about peer |
---|
96 | zooko@zooko.com**20100719082000 |
---|
97 | Ignore-this: e034c4988b327f7e138a106d913a3082 |
---|
98 | ] { |
---|
99 | hunk ./src/allmydata/immutable/upload.py 77 |
---|
100 | # TODO: actual extensions are closer to 419 bytes, so we can probably lower |
---|
101 | # this. |
---|
102 | |
---|
103 | +def pretty_print_shnum_to_servers(s): |
---|
104 | + return ', '.join([ "sh%s: %s" % (k, '+'.join([idlib.shortnodeid_b2a(x) for x in v])) for k, v in s.iteritems() ]) |
---|
105 | + |
---|
106 | class PeerTracker: |
---|
107 | def __init__(self, peerid, storage_server, |
---|
108 | sharesize, blocksize, num_segments, num_share_hashes, |
---|
109 | hunk ./src/allmydata/immutable/upload.py 158 |
---|
110 | del self.buckets[sharenum] |
---|
111 | |
---|
112 | |
---|
113 | -class Tahoe2PeerSelector: |
---|
114 | +class Tahoe2PeerSelector(log.PrefixingLogMixin): |
---|
115 | |
---|
116 | def __init__(self, upload_id, logparent=None, upload_status=None): |
---|
117 | self.upload_id = upload_id |
---|
118 | hunk ./src/allmydata/immutable/upload.py 169 |
---|
119 | self.num_peers_contacted = 0 |
---|
120 | self.last_failure_msg = None |
---|
121 | self._status = IUploadStatus(upload_status) |
---|
122 | - self._log_parent = log.msg("%s starting" % self, parent=logparent) |
---|
123 | + log.PrefixingLogMixin.__init__(self, 'tahoe.immutable.upload', logparent, prefix=upload_id) |
---|
124 | + self.log("starting", level=log.OPERATIONAL) |
---|
125 | |
---|
126 | def __repr__(self): |
---|
127 | return "<Tahoe2PeerSelector for upload %s>" % self.upload_id |
---|
128 | hunk ./src/allmydata/immutable/upload.py 275 |
---|
129 | ds.append(d) |
---|
130 | self.num_peers_contacted += 1 |
---|
131 | self.query_count += 1 |
---|
132 | - log.msg("asking peer %s for any existing shares for " |
---|
133 | - "upload id %s" |
---|
134 | - % (idlib.shortnodeid_b2a(peer.peerid), self.upload_id), |
---|
135 | - level=log.NOISY, parent=self._log_parent) |
---|
136 | + self.log("asking peer %s for any existing shares" % |
---|
137 | + (idlib.shortnodeid_b2a(peer.peerid),), |
---|
138 | + level=log.NOISY) |
---|
139 | dl = defer.DeferredList(ds) |
---|
140 | dl.addCallback(lambda ign: self._loop()) |
---|
141 | return dl |
---|
142 | hunk ./src/allmydata/immutable/upload.py 289 |
---|
143 | Tahoe2PeerSelector._existing_shares. |
---|
144 | """ |
---|
145 | if isinstance(res, failure.Failure): |
---|
146 | - log.msg("%s got error during existing shares check: %s" |
---|
147 | + self.log("%s got error during existing shares check: %s" |
---|
148 | % (idlib.shortnodeid_b2a(peer), res), |
---|
149 | hunk ./src/allmydata/immutable/upload.py 291 |
---|
150 | - level=log.UNUSUAL, parent=self._log_parent) |
---|
151 | + level=log.UNUSUAL) |
---|
152 | self.error_count += 1 |
---|
153 | self.bad_query_count += 1 |
---|
154 | else: |
---|
155 | hunk ./src/allmydata/immutable/upload.py 298 |
---|
156 | buckets = res |
---|
157 | if buckets: |
---|
158 | self.peers_with_shares.add(peer) |
---|
159 | - log.msg("response from peer %s: alreadygot=%s" |
---|
160 | + self.log("response to get_buckets() from peer %s: alreadygot=%s" |
---|
161 | % (idlib.shortnodeid_b2a(peer), tuple(sorted(buckets))), |
---|
162 | hunk ./src/allmydata/immutable/upload.py 300 |
---|
163 | - level=log.NOISY, parent=self._log_parent) |
---|
164 | + level=log.NOISY) |
---|
165 | for bucket in buckets: |
---|
166 | self.preexisting_shares.setdefault(bucket, set()).add(peer) |
---|
167 | if self.homeless_shares and bucket in self.homeless_shares: |
---|
168 | hunk ./src/allmydata/immutable/upload.py 334 |
---|
169 | merged = merge_peers(self.preexisting_shares, self.use_peers) |
---|
170 | effective_happiness = servers_of_happiness(merged) |
---|
171 | if self.servers_of_happiness <= effective_happiness: |
---|
172 | - msg = ("peer selection successful for %s: %s" % (self, |
---|
173 | - self._get_progress_message())) |
---|
174 | - log.msg(msg, parent=self._log_parent) |
---|
175 | + msg = ("server selection successful for %s: %s: pretty_print_merged: %s, self.use_peers: %s, self.preexisting_shares: %s" % (self, self._get_progress_message(), pretty_print_shnum_to_servers(merged), [', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in self.use_peers], pretty_print_shnum_to_servers(self.preexisting_shares))) |
---|
176 | + self.log(msg, level=log.OPERATIONAL) |
---|
177 | return (self.use_peers, self.preexisting_shares) |
---|
178 | else: |
---|
179 | # We're not okay right now, but maybe we can fix it by |
---|
180 | hunk ./src/allmydata/immutable/upload.py 379 |
---|
181 | self.needed_shares, |
---|
182 | self.servers_of_happiness, |
---|
183 | effective_happiness) |
---|
184 | - log.msg("server selection unsuccessful for %r: %s (%s), merged=%r" |
---|
185 | - % (self, msg, self._get_progress_message(), merged), level=log.INFREQUENT) |
---|
186 | + self.log("server selection unsuccessful for %r: %s (%s), merged=%s" % (self, msg, self._get_progress_message(), pretty_print_shnum_to_servers(merged)), level=log.INFREQUENT) |
---|
187 | return self._failed("%s (%s)" % (msg, self._get_progress_message())) |
---|
188 | |
---|
189 | if self.uncontacted_peers: |
---|
190 | hunk ./src/allmydata/immutable/upload.py 402 |
---|
191 | elif self.contacted_peers: |
---|
192 | # ask a peer that we've already asked. |
---|
193 | if not self._started_second_pass: |
---|
194 | - log.msg("starting second pass", parent=self._log_parent, |
---|
195 | + self.log("starting second pass", |
---|
196 | level=log.NOISY) |
---|
197 | self._started_second_pass = True |
---|
198 | num_shares = mathutil.div_ceil(len(self.homeless_shares), |
---|
199 | hunk ./src/allmydata/immutable/upload.py 440 |
---|
200 | self._get_progress_message())) |
---|
201 | if self.last_failure_msg: |
---|
202 | msg += " (%s)" % (self.last_failure_msg,) |
---|
203 | - log.msg(msg, level=log.UNUSUAL, parent=self._log_parent) |
---|
204 | + self.log(msg, level=log.UNUSUAL) |
---|
205 | return self._failed(msg) |
---|
206 | else: |
---|
207 | # we placed enough to be happy, so we're done |
---|
208 | hunk ./src/allmydata/immutable/upload.py 446 |
---|
209 | if self._status: |
---|
210 | self._status.set_status("Placed all shares") |
---|
211 | + msg = ("server selection successful (no more servers) for %s: %s: %s" % (self, |
---|
212 | + self._get_progress_message(), pretty_print_shnum_to_servers(merged))) |
---|
213 | + self.log(msg, level=log.OPERATIONAL) |
---|
214 | return (self.use_peers, self.preexisting_shares) |
---|
215 | |
---|
216 | def _got_response(self, res, peer, shares_to_ask, put_peer_here): |
---|
217 | hunk ./src/allmydata/immutable/upload.py 455 |
---|
218 | if isinstance(res, failure.Failure): |
---|
219 | # This is unusual, and probably indicates a bug or a network |
---|
220 | # problem. |
---|
221 | - log.msg("%s got error during peer selection: %s" % (peer, res), |
---|
222 | - level=log.UNUSUAL, parent=self._log_parent) |
---|
223 | + self.log("%s got error during peer selection: %s" % (peer, res), |
---|
224 | + level=log.UNUSUAL) |
---|
225 | self.error_count += 1 |
---|
226 | self.bad_query_count += 1 |
---|
227 | self.homeless_shares = list(shares_to_ask) + self.homeless_shares |
---|
228 | hunk ./src/allmydata/immutable/upload.py 475 |
---|
229 | self.last_failure_msg = msg |
---|
230 | else: |
---|
231 | (alreadygot, allocated) = res |
---|
232 | - log.msg("response from peer %s: alreadygot=%s, allocated=%s" |
---|
233 | + self.log("response to allocate_buckets() from peer %s: alreadygot=%s, allocated=%s" |
---|
234 | % (idlib.shortnodeid_b2a(peer.peerid), |
---|
235 | tuple(sorted(alreadygot)), tuple(sorted(allocated))), |
---|
236 | hunk ./src/allmydata/immutable/upload.py 478 |
---|
237 | - level=log.NOISY, parent=self._log_parent) |
---|
238 | + level=log.NOISY) |
---|
239 | progress = False |
---|
240 | for s in alreadygot: |
---|
241 | self.preexisting_shares.setdefault(s, set()).add(peer.peerid) |
---|
242 | hunk ./src/allmydata/immutable/upload.py 921 |
---|
243 | @paran already_peers: a dict mapping sharenum to a set of peerids |
---|
244 | that claim to already have this share |
---|
245 | """ |
---|
246 | - self.log("_send_shares, upload_servers is %s" % (upload_servers,)) |
---|
247 | + self.log("set_shareholders; upload_servers is %s, already_peers is %s" % ([', '.join(["%s: %s" % (k, idlib.shortnodeid_b2a(v._nodeid),) for k,v in p.buckets.iteritems()]) for p in upload_servers], already_peers)) |
---|
248 | # record already-present shares in self._results |
---|
249 | self._results.preexisting_shares = len(already_peers) |
---|
250 | |
---|
251 | hunk ./src/allmydata/immutable/upload.py 935 |
---|
252 | for shnum in peer.buckets: |
---|
253 | self._peer_trackers[shnum] = peer |
---|
254 | servermap.setdefault(shnum, set()).add(peer.peerid) |
---|
255 | + self.log("set_shareholders; %s (%s) == %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers])) |
---|
256 | assert len(buckets) == sum([len(peer.buckets) for peer in upload_servers]), "%s (%s) != %s (%s)" % (len(buckets), buckets, sum([len(peer.buckets) for peer in upload_servers]), [(p.buckets, p.peerid) for p in upload_servers]) |
---|
257 | encoder.set_shareholders(buckets, servermap) |
---|
258 | |
---|
259 | hunk ./src/allmydata/storage/server.py 8 |
---|
260 | |
---|
261 | from zope.interface import implements |
---|
262 | from allmydata.interfaces import RIStorageServer, IStatsProducer |
---|
263 | -from allmydata.util import fileutil, log, time_format |
---|
264 | +from allmydata.util import fileutil, idlib, log, time_format |
---|
265 | import allmydata # for __full_version__ |
---|
266 | |
---|
267 | from allmydata.storage.common import si_b2a, si_a2b, storage_index_to_dir |
---|
268 | hunk ./src/allmydata/storage/server.py 109 |
---|
269 | expiration_sharetypes) |
---|
270 | self.lease_checker.setServiceParent(self) |
---|
271 | |
---|
272 | + def __repr__(self): |
---|
273 | + return "<StorageServer %s>" % (idlib.shortnodeid_b2a(self.my_nodeid),) |
---|
274 | + |
---|
275 | def add_bucket_counter(self): |
---|
276 | statefile = os.path.join(self.storedir, "bucket_counter.state") |
---|
277 | self.bucket_counter = BucketCountingCrawler(self, statefile) |
---|
278 | hunk ./src/allmydata/test/test_upload.py 14 |
---|
279 | from allmydata import uri, monitor, client |
---|
280 | from allmydata.immutable import upload, encode |
---|
281 | from allmydata.interfaces import FileTooLargeError, UploadUnhappinessError |
---|
282 | +from allmydata.util import log |
---|
283 | from allmydata.util.assertutil import precondition |
---|
284 | from allmydata.util.deferredutil import DeferredListShouldSucceed |
---|
285 | from allmydata.test.no_network import GridTestMixin |
---|
286 | hunk ./src/allmydata/test/test_upload.py 714 |
---|
287 | |
---|
288 | def is_happy_enough(servertoshnums, h, k): |
---|
289 | """ I calculate whether servertoshnums achieves happiness level h. I do this with a naïve "brute force search" approach. (See src/allmydata/util/happinessutil.py for a better algorithm.) """ |
---|
290 | + print "servertoshnums: ", servertoshnums, "h: ", h, "k: ", k |
---|
291 | if len(servertoshnums) < h: |
---|
292 | return False |
---|
293 | # print "servertoshnums: ", servertoshnums, h, k |
---|
294 | hunk ./src/allmydata/test/test_upload.py 803 |
---|
295 | def _add_server(self, server_number, readonly=False): |
---|
296 | assert self.g, "I tried to find a grid at self.g, but failed" |
---|
297 | ss = self.g.make_server(server_number, readonly) |
---|
298 | + log.msg("just created a server, number: %s => %s" % (server_number, ss,)) |
---|
299 | self.g.add_server(server_number, ss) |
---|
300 | |
---|
301 | hunk ./src/allmydata/test/test_upload.py 806 |
---|
302 | - |
---|
303 | def _add_server_with_share(self, server_number, share_number=None, |
---|
304 | readonly=False): |
---|
305 | self._add_server(server_number, readonly) |
---|
306 | hunk ./src/allmydata/test/test_upload.py 866 |
---|
307 | d.addCallback(_store_shares) |
---|
308 | return d |
---|
309 | |
---|
310 | - |
---|
311 | def test_configure_parameters(self): |
---|
312 | self.basedir = self.mktemp() |
---|
313 | hooks = {0: self._set_up_nodes_extra_config} |
---|
314 | } |
---|
315 | |
---|
316 | Context: |
---|
317 | |
---|
318 | [TAG allmydata-tahoe-1.7.1 |
---|
319 | zooko@zooko.com**20100719131352 |
---|
320 | Ignore-this: 6942056548433dc653a746703819ad8c |
---|
321 | ] |
---|
322 | Patch bundle hash: |
---|
323 | 23cbf4e3d70f30b162d9e51c64c64b100923c5c6 |
---|