Ticket #587: test_ambient_upload_authority.py

File test_ambient_upload_authority.py, 3.7 KB (added by toby.murray, at 2009-01-17T01:50:30Z)

A test case for this configuration parameter with one test for each of its two boolean states

Line 
1import os
2
3from twisted.trial import unittest
4from twisted.internet import defer, reactor
5from twisted.python import log
6from twisted.web import client, http
7
8from allmydata.test.common import SystemTestMixin
9
10
11class TestCase(SystemTestMixin, unittest.TestCase):
12
13    def setAmbientUploadAuthority(self,ambientUploadAuthority):
14        self.ambientUploadAuthority = ambientUploadAuthority
15
16    def _test_ambient_upload_authority(self):
17        self.webip = "127.0.0.1" 
18        self.webport = 3456
19        self.basedir = self.mktemp()
20
21        # set up an introducer and a node
22        d = self.set_up_nodes(1)
23        d.addCallback(self._test_ambient_upload_authority2)               
24        d.addErrback(self.fail)
25        return d
26
27    def _set_up_nodes_extra_config(self):
28        # we need to remove the 'webport' old-style config file
29        # or else the node won't start
30        os.remove(os.path.join(self.getdir("client0"), "webport"))
31        f = open(os.path.join(self.getdir("client0"), "tahoe.cfg"), "wt")
32        f.write("\n")
33        f.write("[node]\n")
34        f.write("web.ambient_upload_authority = %s\n" % ("false","true")[self.ambientUploadAuthority])
35        f.write("web.port = tcp:%d:interface=%s\n" % (self.webport, self.webip))
36        f.write("\n")
37        f.write("[client]\n")
38        f.write("introducer.furl = %s\n" % self.introducer_furl)
39        f.write("\n")
40        f.write("[storage]\n")
41        f.write("enabled = true\n")
42        f.write("\n")
43        f.close()
44       
45       
46    def _test_ambient_upload_authority2(self, ignored=None):
47        content_type = 'multipart/form-data; boundary=----------ThIs_Is_tHe_bouNdaRY_$'
48        body = '------------ThIs_Is_tHe_bouNdaRY_$\r\nContent-Disposition: form-data; name="t"\r\n\r\nupload\r\n------------ThIs_Is_tHe_bouNdaRY_$\r\nContent-Disposition: form-data; name="file"; filename="file1.txt"\r\nContent-Type: application/octet-stream\r\n\r\nsome test text\r\n------------ThIs_Is_tHe_bouNdaRY_$--\r\n'
49        headers = {'Content-Type': content_type,
50                   'Content-Length': len(body)}
51
52        deferreds = []
53        expected = (http.BAD_REQUEST, http.OK)[self.ambientUploadAuthority]
54
55        # try to upload using the local web client
56        def tryRequest(pathetc, method, postdata=None, headers=None):
57            url = "http://%s:%d/%s" % (self.webip, self.webport, pathetc)
58            f = client.HTTPClientFactory(url,method, postdata, headers)
59            f.deferred.addCallback(self._cbCheckResponse,[f,expected])
60            f.deferred.addErrback(self._cbCheckResponse,[f,expected])
61            deferreds.append(f.deferred)
62            reactor.connectTCP(self.webip, self.webport, f)
63       
64        tryRequest("uri","PUT","non contents\r\n")
65        tryRequest("uri?t=mkdir","PUT")
66        tryRequest("uri?t=mkdir","POST")
67        tryRequest("uri?t=upload","POST",body,headers)
68
69        # give us one deferred that will fire iff all of the above succeed
70        dlist = defer.DeferredList(deferreds,fireOnOneCallback=False,
71                                   fireOnOneErrback=True)
72        dlist.addErrback(self.fail)
73
74        return dlist
75
76    def _cbCheckResponse(self, ignored, cmp):
77        r = cmp[0]
78        expected = cmp[1]
79        self.failUnless(int(r.status) == expected)
80       
81       
82class TestAmbientUploadAuthorityEnabled(TestCase):
83    def setUp(self):
84        TestCase.setUp(self)
85        self.setAmbientUploadAuthority(True)
86
87    def test_ambient_upload_authority_enabled(self):
88        return self._test_ambient_upload_authority()
89
90class TestAmbientUploadAuthorityDisabled(TestCase):
91    def setUp(self):
92        TestCase.setUp(self)
93        self.setAmbientUploadAuthority(False)
94
95    def test_ambient_upload_authority_disabled(self):
96        return self._test_ambient_upload_authority()
97