source: trunk/src/allmydata/scripts/tahoe_add_alias.py

Last change on this file was 1cfe843d, checked in by Alexandre Detiste <alexandre.detiste@…>, at 2024-02-22T23:40:25Z

more python2 removal

  • Property mode set to 100644
File size: 6.2 KB
Line 
1"""
2Ported to Python 3.
3"""
4
5import os.path
6import codecs
7
8from allmydata.util.assertutil import precondition
9
10from allmydata import uri
11from allmydata.scripts.common_http import do_http, check_http_error
12from allmydata.scripts.common import get_aliases
13from allmydata.util.fileutil import move_into_place
14from allmydata.util.encodingutil import quote_output, quote_output_u
15from allmydata.util import jsonbytes as json
16
17
18def add_line_to_aliasfile(aliasfile, alias, cap):
19    # we use os.path.exists, rather than catching EnvironmentError, to avoid
20    # clobbering the valuable alias file in case of spurious or transient
21    # filesystem errors.
22    if os.path.exists(aliasfile):
23        f = codecs.open(aliasfile, "r", "utf-8")
24        aliases = f.read()
25        f.close()
26        if not aliases.endswith("\n"):
27            aliases += "\n"
28    else:
29        aliases = ""
30    aliases += "%s: %s\n" % (alias, cap)
31    f = codecs.open(aliasfile+".tmp", "w", "utf-8")
32    f.write(aliases)
33    f.close()
34    move_into_place(aliasfile+".tmp", aliasfile)
35
36def add_alias(options):
37    nodedir = options['node-directory']
38    alias = options.alias
39    precondition(isinstance(alias, str), alias=alias)
40    cap = options.cap
41    stdout = options.stdout
42    stderr = options.stderr
43    if u":" in alias:
44        # a single trailing colon will already have been stripped if present
45        print("Alias names cannot contain colons.", file=stderr)
46        return 1
47    if u" " in alias:
48        print("Alias names cannot contain spaces.", file=stderr)
49        return 1
50
51    old_aliases = get_aliases(nodedir)
52    if alias in old_aliases:
53        show_output(stderr, "Alias {alias} already exists!", alias=alias)
54        return 1
55    aliasfile = os.path.join(nodedir, "private", "aliases")
56    cap = str(uri.from_string_dirnode(cap).to_string(), 'utf-8')
57
58    add_line_to_aliasfile(aliasfile, alias, cap)
59    show_output(stdout, "Alias {alias} added", alias=alias)
60    return 0
61
62def create_alias(options):
63    # mkdir+add_alias
64    nodedir = options['node-directory']
65    alias = options.alias
66    precondition(isinstance(alias, str), alias=alias)
67    stdout = options.stdout
68    stderr = options.stderr
69    if u":" in alias:
70        # a single trailing colon will already have been stripped if present
71        print("Alias names cannot contain colons.", file=stderr)
72        return 1
73    if u" " in alias:
74        print("Alias names cannot contain spaces.", file=stderr)
75        return 1
76
77    old_aliases = get_aliases(nodedir)
78    if alias in old_aliases:
79        show_output(stderr, "Alias {alias} already exists!", alias=alias)
80        return 1
81
82    aliasfile = os.path.join(nodedir, "private", "aliases")
83
84    nodeurl = options['node-url']
85    if not nodeurl.endswith("/"):
86        nodeurl += "/"
87    url = nodeurl + "uri?t=mkdir"
88    resp = do_http("POST", url)
89    rc = check_http_error(resp, stderr)
90    if rc:
91        return rc
92    new_uri = resp.read().strip()
93
94    # probably check for others..
95
96    add_line_to_aliasfile(aliasfile, alias, str(new_uri, "utf-8"))
97    show_output(stdout, "Alias {alias} created", alias=alias)
98    return 0
99
100
101def show_output(fp, template, **kwargs):
102    """
103    Print to just about anything.
104
105    :param fp: A file-like object to which to print.  This handles the case
106        where ``fp`` declares a support encoding with the ``encoding``
107        attribute (eg sys.stdout on Python 3).  It handles the case where
108        ``fp`` declares no supported encoding via ``None`` for its
109        ``encoding`` attribute (eg sys.stdout on Python 2 when stdout is not a
110        tty).  It handles the case where ``fp`` declares an encoding that does
111        not support all of the characters in the output by forcing the
112        "namereplace" error handler.  It handles the case where there is no
113        ``encoding`` attribute at all (eg StringIO.StringIO) by writing
114        utf-8-encoded bytes.
115    """
116    assert isinstance(template, str)
117
118    # On Python 3 fp has an encoding attribute under all real usage.  On
119    # Python 2, the encoding attribute is None if stdio is not a tty.  The
120    # test suite often passes StringIO which has no such attribute.  Make
121    # allowances for this until the test suite is fixed and Python 2 is no
122    # more.
123    try:
124        encoding = fp.encoding or "utf-8"
125    except AttributeError:
126        has_encoding = False
127        encoding = "utf-8"
128    else:
129        has_encoding = True
130
131    output = template.format(**{
132        k: quote_output_u(v, encoding=encoding)
133        for (k, v)
134        in kwargs.items()
135    })
136    safe_output = output.encode(encoding, "namereplace")
137    if has_encoding:
138        safe_output = safe_output.decode(encoding)
139    print(safe_output, file=fp)
140
141
142def _get_alias_details(nodedir):
143    aliases = get_aliases(nodedir)
144    alias_names = sorted(aliases.keys())
145    data = {}
146    for name in alias_names:
147        dircap = uri.from_string(aliases[name])
148        data[name] = {
149            "readwrite": dircap.to_string(),
150            "readonly": dircap.get_readonly().to_string(),
151        }
152    return data
153
154
155def _escape_format(t):
156    """
157    _escape_format(t).format() == t
158
159    :param unicode t: The text to escape.
160    """
161    return t.replace("{", "{{").replace("}", "}}")
162
163
164def list_aliases(options):
165    """
166    Show aliases that exist.
167    """
168    data = _get_alias_details(options['node-directory'])
169
170    if options['json']:
171        dumped = json.dumps(data, indent=4)
172        if isinstance(dumped, bytes):
173            dumped = dumped.decode("utf-8")
174        output = _escape_format(dumped)
175    else:
176        def dircap(details):
177            return (
178                details['readonly']
179                if options['readonly-uri']
180                else details['readwrite']
181            ).decode("utf-8")
182
183        def format_dircap(name, details):
184            return fmt % (name, dircap(details))
185
186        max_width = max([len(quote_output(name)) for name in data.keys()] + [0])
187        fmt = "%" + str(max_width) + "s: %s"
188        output = "\n".join(list(
189            format_dircap(name, details)
190            for name, details
191            in data.items()
192        ))
193
194    if output:
195        # Show whatever we computed.  Skip this if there is no output to avoid
196        # a spurious blank line.
197        show_output(options.stdout, output)
198
199    return 0
Note: See TracBrowser for help on using the repository browser.