source: trunk/src/allmydata/scripts/common.py

Last change on this file was ae29ea2, checked in by Itamar Turner-Trauring <itamar@…>, at 2023-03-24T15:51:40Z

Fix lint, and some Python 3 cleanups.

  • Property mode set to 100644
File size: 10.0 KB
Line 
1# coding: utf-8
2
3"""
4Ported to Python 3.
5"""
6
7from typing import Union, Optional
8
9import os, sys, textwrap
10import codecs
11from os.path import join
12import urllib.parse
13
14from yaml import (
15    safe_dump,
16)
17
18from twisted.python import usage
19
20from allmydata.util.assertutil import precondition
21from allmydata.util.encodingutil import quote_output, \
22    quote_local_unicode_path, argv_to_abspath
23from allmydata.scripts.default_nodedir import _default_nodedir
24from .types_ import Parameters
25
26
27def get_default_nodedir():
28    return _default_nodedir
29
30def wrap_paragraphs(text, width):
31    # like textwrap.wrap(), but preserve paragraphs (delimited by double
32    # newlines) and leading whitespace, and remove internal whitespace.
33    text = textwrap.dedent(text)
34    if text.startswith("\n"):
35        text = text[1:]
36    return "\n\n".join([textwrap.fill(paragraph, width=width)
37                        for paragraph in text.split("\n\n")])
38
39class BaseOptions(usage.Options):
40    def __init__(self):
41        super(BaseOptions, self).__init__()
42        self.command_name = os.path.basename(sys.argv[0])
43
44    # Only allow "tahoe --version", not e.g. "tahoe <cmd> --version"
45    def opt_version(self):
46        raise usage.UsageError("--version not allowed on subcommands")
47
48    description : Optional[str] = None
49    description_unwrapped = None  # type: Optional[str]
50
51    def __str__(self):
52        width = int(os.environ.get('COLUMNS', '80'))
53        s = (self.getSynopsis() + '\n' +
54             "(use 'tahoe --help' to view global options)\n" +
55             '\n' +
56             self.getUsage())
57        if self.description:
58            s += '\n' + wrap_paragraphs(self.description, width) + '\n'
59        if self.description_unwrapped:
60            du = textwrap.dedent(self.description_unwrapped)
61            if du.startswith("\n"):
62                du = du[1:]
63            s += '\n' + du + '\n'
64        return s
65
66class BasedirOptions(BaseOptions):
67    default_nodedir = _default_nodedir
68
69    optParameters : Parameters = [
70        ["basedir", "C", None, "Specify which Tahoe base directory should be used. [default: %s]"
71         % quote_local_unicode_path(_default_nodedir)],
72    ]
73
74    def parseArgs(self, basedir=None):
75        # This finds the node-directory option correctly even if we are in a subcommand.
76        root = self.parent
77        while root.parent is not None:
78            root = root.parent
79
80        if root['node-directory'] and self['basedir']:
81            raise usage.UsageError("The --node-directory (or -d) and --basedir (or -C) options cannot both be used.")
82        if root['node-directory'] and basedir:
83            raise usage.UsageError("The --node-directory (or -d) option and a basedir argument cannot both be used.")
84        if self['basedir'] and basedir:
85            raise usage.UsageError("The --basedir (or -C) option and a basedir argument cannot both be used.")
86
87        if basedir:
88            b = argv_to_abspath(basedir)
89        elif self['basedir']:
90            b = argv_to_abspath(self['basedir'])
91        elif root['node-directory']:
92            b = argv_to_abspath(root['node-directory'])
93        elif self.default_nodedir:
94            b = self.default_nodedir
95        else:
96            raise usage.UsageError("No default basedir available, you must provide one with --node-directory, --basedir, or a basedir argument")
97        self['basedir'] = b
98        self['node-directory'] = b
99
100    def postOptions(self):
101        if not self['basedir']:
102            raise usage.UsageError("A base directory for the node must be provided.")
103
104class NoDefaultBasedirOptions(BasedirOptions):
105    default_nodedir = None
106
107    optParameters = [
108        ["basedir", "C", None, "Specify which Tahoe base directory should be used."],
109    ]  # type: Parameters
110
111    # This is overridden in order to ensure we get a "Wrong number of arguments."
112    # error when more than one argument is given.
113    def parseArgs(self, basedir=None):
114        BasedirOptions.parseArgs(self, basedir)
115
116    def getSynopsis(self):
117        return "Usage:  %s [global-options] %s [options] NODEDIR" % (self.command_name, self.subcommand_name)
118
119
120DEFAULT_ALIAS = u"tahoe"
121
122
123def write_introducer(basedir, petname, furl):
124    """
125    Overwrite the node's ``introducers.yaml`` with a file containing the given
126    introducer information.
127    """
128    if isinstance(furl, bytes):
129        furl = furl.decode("utf-8")
130    private = basedir.child(b"private")
131    private.makedirs(ignoreExistingDirectory=True)
132    private.child(b"introducers.yaml").setContent(
133        safe_dump({
134            "introducers": {
135                petname: {
136                    "furl": furl,
137                },
138            },
139        }).encode("ascii"),
140    )
141
142
143def get_introducer_furl(nodedir, config):
144    """
145    :return: the introducer FURL for the given node (no matter if it's
146        a client-type node or an introducer itself)
147    """
148    for petname, (furl, cache) in config.get_introducer_configuration().items():
149        return furl
150
151    # We have no configured introducers.  Maybe this is running *on* the
152    # introducer?  Let's guess, sure why not.
153    try:
154        with open(join(nodedir, "private", "introducer.furl"), "r") as f:
155            return f.read().strip()
156    except IOError:
157        raise Exception(
158            "Can't find introducer FURL in tahoe.cfg nor "
159            "{}/private/introducer.furl".format(nodedir)
160        )
161
162
163def get_aliases(nodedir):
164    aliases = {}
165    aliasfile = os.path.join(nodedir, "private", "aliases")
166    rootfile = os.path.join(nodedir, "private", "root_dir.cap")
167    try:
168        with open(rootfile, "r") as f:
169            rootcap = f.read().strip()
170            if rootcap:
171                aliases[DEFAULT_ALIAS] = rootcap
172    except EnvironmentError:
173        pass
174    try:
175        with codecs.open(aliasfile, "r", "utf-8") as f:
176            for line in f:
177                line = line.strip()
178                if line.startswith("#") or not line:
179                    continue
180                name, cap = line.split(u":", 1)
181                # normalize it: remove http: prefix, urldecode
182                cap = cap.strip().encode('utf-8')
183                aliases[name] = cap
184    except EnvironmentError:
185        pass
186    return aliases
187
188class DefaultAliasMarker(object):
189    pass
190
191pretend_platform_uses_lettercolon = False # for tests
192def platform_uses_lettercolon_drivename():
193    if ("win32" in sys.platform.lower()
194        or "cygwin" in sys.platform.lower()
195        or pretend_platform_uses_lettercolon):
196        return True
197    return False
198
199
200class TahoeError(Exception):
201    def __init__(self, msg):
202        Exception.__init__(self, msg)
203        self.msg = msg
204
205    def display(self, err):
206        print(self.msg, file=err)
207
208
209class UnknownAliasError(TahoeError):
210    def __init__(self, msg):
211        TahoeError.__init__(self, "error: " + msg)
212
213
214def get_alias(aliases, path_unicode, default):
215    """
216    Transform u"work:path/filename" into (aliases[u"work"], u"path/filename".encode('utf-8')).
217    If default=None, then an empty alias is indicated by returning
218    DefaultAliasMarker. We special-case strings with a recognized cap URI
219    prefix, to make it easy to access specific files/directories by their
220    caps.
221    If the transformed alias is either not found in aliases, or is blank
222    and default is not found in aliases, an UnknownAliasError is
223    raised.
224    """
225    precondition(isinstance(path_unicode, str), path_unicode)
226
227    from allmydata import uri
228    path = path_unicode.encode('utf-8').strip(b" ")
229    if uri.has_uri_prefix(path):
230        # We used to require "URI:blah:./foo" in order to get a subpath,
231        # stripping out the ":./" sequence. We still allow that for compatibility,
232        # but now also allow just "URI:blah/foo".
233        sep = path.find(b":./")
234        if sep != -1:
235            return path[:sep], path[sep+3:]
236        sep = path.find(b"/")
237        if sep != -1:
238            return path[:sep], path[sep+1:]
239        return path, b""
240    colon = path.find(b":")
241    if colon == -1:
242        # no alias
243        if default == None:
244            return DefaultAliasMarker, path
245        if default not in aliases:
246            raise UnknownAliasError("No alias specified, and the default %s alias doesn't exist. "
247                                    "To create it, use 'tahoe create-alias %s'."
248                                    % (quote_output(default), quote_output(default, quotemarks=False)))
249        return uri.from_string_dirnode(aliases[default]).to_string(), path
250    if colon == 1 and default is None and platform_uses_lettercolon_drivename():
251        # treat C:\why\must\windows\be\so\weird as a local path, not a tahoe
252        # file in the "C:" alias
253        return DefaultAliasMarker, path
254
255    # decoding must succeed because path is valid UTF-8 and colon & space are ASCII
256    alias = path[:colon].decode('utf-8')
257    if u"/" in alias:
258        # no alias, but there's a colon in a dirname/filename, like
259        # "foo/bar:7"
260        if default == None:
261            return DefaultAliasMarker, path
262        if default not in aliases:
263            raise UnknownAliasError("No alias specified, and the default %s alias doesn't exist. "
264                                    "To create it, use 'tahoe create-alias %s'."
265                                    % (quote_output(default), quote_output(default, quotemarks=False)))
266        return uri.from_string_dirnode(aliases[default]).to_string(), path
267    if alias not in aliases:
268        raise UnknownAliasError("Unknown alias %s, please create it with 'tahoe add-alias' or 'tahoe create-alias'." %
269                                quote_output(alias))
270    return uri.from_string_dirnode(aliases[alias]).to_string(), path[colon+1:]
271
272def escape_path(path: Union[str, bytes]) -> str:
273    """
274    Return path quoted to US-ASCII, valid URL characters.
275
276    >>> path = u'/føö/bar/☃'
277    >>> escaped = escape_path(path)
278    >>> escaped
279    u'/f%C3%B8%C3%B6/bar/%E2%98%83'
280    """
281    if isinstance(path, str):
282        path = path.encode("utf-8")
283    segments = path.split(b"/")
284    result = str(
285        b"/".join([
286            urllib.parse.quote(s).encode("ascii") for s in segments
287        ]),
288        "ascii"
289    )
290    return result
Note: See TracBrowser for help on using the repository browser.