💾 Archived View for blakes.dev › gmEditor › gmEditor.py captured on 2024-08-25 at 00:08:31.

View Raw

More Information

⬅️ Previous capture (2024-07-09)

-=-=-=-=-=-=-

# gmEditor 2.0.2, a Gemini capsule editor tool
# Copyright (C) 2024 Blake Leonard
# 
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
# 
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
# 
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 
# SOFTWARE.

docs = """```
=== DOCUMENTATION ===
                   ___                        
 ,---. ,--,--,--. / (_)   |  o                
| .-. ||        | \__   __|    _|_  __   ,_   
' '-' '|  |  |  | /    /  |  |  |  /  \_/  |  
.`-  / `--`--`--' \___/\_/|_/|_/|_/\__/    |_/
`---'                                         
gmEditor: v2.0.2

This GMCapsule extension module allows you to easily and powerfully upload to
Gemini capsules via the Titan companion protocol.
It is configured with the "uploads" key in GMCapsule's config file
(see below).


subject to IP ban or certificate removal if you do this.

it's more likely that I'll ignore your request if you ask for help. This is
not your fault, but you have been warned.

if they're in the URL. Be careful about what token is currently active! This
is why gmEditor put in various checks, like the path repeated in the delete
token, the content checks for move and copy, and the accidental overwrite
checks.

To set backed-up Gemtext files to render as Gemtext, add the following line
to the root ".meta" file:


=== CONFIGURATION REFERENCE ===
[uploads]
certs = 23ac...
  # WHITESPACE separated public key or certificate fingerprints.
  # These 'certs' apply to all hostnames by default.
  # REQUIRED unless a per-domain section is configured.

[uploads:example.com]
  # Replace example.com with your actual hostname.
  # This section is entirely optional.
  # If it's excluded, the defaults apply.
certs = 23ac...
  # Same as above; but for just this domain, if set.
  # NOTE: If 'certs' is unset, it defaults to the 'certs' under [uploads].
allow_global_certs = False
  # set to True to allow both this domain's 'certs' and global 'certs'.
exclude = PATH PATH2...
  # WHITESPACE separated path globs to exclude, i.e. '*.zip' or 'excluded/'.

=== ACTION TOKEN REFERENCE ===
<empty>
  Edit the current file, or create it if necessary. The "current file" is the
  one you're trying to create or edit in your client; typically this shows up
  after the domain name and port in the URL bar or at the top of your editing
  form.
  On success: 30 redirect to the current file on Gemini.

about |OR| help
  This help page!
  20 status code with this page in plain-text. The tick-marks at the top force
  Gemtext-only clients to render this page in preformatted plain text.

delete <PATH>
  Replace <PATH> with the path to delete, which must match the request PATH.
  Include the leading /. This is done as a confirmation step, both to prevent
  accidental file deletion and to prevent accidentally deleting the wrong 
  file, which might happen when a client tries to re-use a token.
  If you forget the path or get it wrong, the upload handler will tell you.
  The content is not checked for this request. To save bandwidth, leave it
  empty.
  On success: 52 status code with message "Deleted successfully".

copy <PATH> |OR| cp <PATH>
  Replace <PATH> with the destination to copy the file being edited to. If
  the leading / is not present, the path is relative to the current file or
  directory.
  The contents must be empty or identical to the source file. This is to
  prevent ambiguity. If you want to edit the current file and copy the
  original, use the related "backup" token.
  On success: 30 redirect to the new <PATH> on Titan, so you can edit it.
  Token will be unset.

move <PATH> |OR| mv <PATH> |OR| rename <PATH>
  Replace <PATH> with the new path of this file. If the leading / is not
  present, the path is relative to the current file or directory, useful for
  renaming.
  The contents must be empty or identical to the source file. This is to
  prevent ambiguity.
  On success: 30 redirect to the new <PATH> on Titan, so you can edit it.
  Token will be unset.

backup [PATH]
  Copy the current version of the edited file to [PATH], or if it's not
  provided, a timestamped backup version, and apply the contents to the
  edited file as normal.
  If the edited contents are empty, the original file is untouched. If the
  edited file is actually a directory, the directory gets backed up!
  If the edited contents are not empty, though, and the edited file is a
  directory, the index.gmi file gets edited and backed up.
  An autogenerated file name consists of the original filename, a hyphen,
  the UNIX timestamp at the time of backup, and the suffix ".bak".
  On success: 
    if empty: 30 redirect to the backed-up file on Titan, with the
    token unset.
    if not empty: 30 redirect to the current file on Gemini.

import <PATH>
  Copy the contents of the provided file path to the current file. The
  contents must be empty to confirm this action.
  Future improvements may allow remote lookups by URI.
  On success: 30 redirect to the current file on Gemini.

[e]ls
  Lists the contents of the current directory (either the current file or the
  directory it's in).
  No editing operation is done.
  The file or directory must already exist.
  On success: 20 status code with a directory listing in Gemtext. The links 
  are to the Gemini version, unless "els" is used, in which case the Titan
  paths will be linked to instead for easy editing.
  WARNING: your browser may not look up the contents of the link before
  offering the Titan form to you! So this is best paired with action tokens
  that don't use the content, like copy, ls, import, or delete.
"""

#import gmcapsule;
import os
import pathlib
import urllib
import shutil


class GeminiError(Exception):
    def __init__(self, *args):
        self.response = args
def returnGeminiError(inner):
    def wrapper(*args, **kwargs):
        try:
            return inner(*args, **kwargs)
        except GeminiError as e:
            return e.response
    return wrapper

class UploadHandler:
    def __init__(self, context, host=None, host_config={}, global_allowed_keys=[]):
        self.host = host
        self.context = context
        self.config = host_config
        self.global_allowed_keys = global_allowed_keys

        self.local_allowed_keys = []
        if self.config["certs"]:
            self.local_allowed_keys = self.config["certs"].split()
        self.global_fallback_keys_allowed = self.config.getboolean("allow_global_certs", False)

    def check_parent_traversal(self, path, fp_cert):
        """
        Just call it to run the check.
        Will raise a GeminiError if it fails.
        Best used with @returnGeminiError.
        """
        if "/../" in path or path.endswith("/..") or path.startswith("../"):
            # TODO: fail2ban whoever tries to do this
            print("[gmEditor] WARNING: "+fp_cert+" just tried to do parent traversal!")
            raise GeminiError(59, "Parent traversal is not allowed. You have been reported.")
        return None
    def check_file_unchanged(self, file, content, can_empty=True):
        """
        Call this with the file path and text contents to compare.
        Will raise a GeminiError if it fails; best used with @returnGeminiError.
        """
        with open(file, 'rb') as source:
            # check if the files are the same (by hash)
            import hashlib
            source_hash = hashlib.sha256()
            while True:
                data = source.read(4096)
                if not data:
                    break
                source_hash.update(data)
            content_hash = hashlib.sha256()
            content_hash.update(content)
            if source_hash.hexdigest() != content_hash.hexdigest():
                if can_empty:
                    raise GeminiError(59, "To avoid confusion, either empty the content (preferred) or do not change it from the source file.")
                else:
                    raise GeminiError(59, "To avoid confusion, do not change the content from the source file.")

    @returnGeminiError
    def upload(self, request):
        if request.content_token:
            request.content_token = urllib.parse.unquote(request.content_token)
        excludes = self.config.get("exclude", "").split() if "exclude" in self.config else []
        # if request.path in excludes:
        #     # TODO: is this the correct way to cause it to skip the handler?
        #     raise "Excluded"
        if request.scheme != "titan":
            return (59, "Protocol must be Titan to edit files.")
        for path in excludes:
            if pathlib.PurePosixPath(request.path).match(path):
                return (59, "Editing here is forbidden for everyone.")

        def get_path():
            return (self.context.cfg.root_dir() / request.hostname).joinpath(request.path[1:])

        if not request.identity:
            return (60, "I'm sorry Dave, I'm afraid I can't do that.")

        allowed = False
        if "certs" in self.config:
            for cert in self.config["certs"].split():
                if request.identity.fp_cert.lower() == cert.lower():
                    allowed = True
                    break
                elif request.identity.fp_pubkey.lower() == cert.lower():
                    allowed = True
                    break
            if allowed == False and not self.global_fallback_keys_allowed:
                return (61, "You are not allowed to upload to this capsule.")
        for cert in self.global_allowed_keys:
            if request.identity.fp_cert.lower() == cert.lower():
                allowed = True
                break
            elif request.identity.fp_pubkey.lower() == cert.lower():
                allowed = True
                break
        if allowed == False:
            return (61, "You are not allowed to upload to this capsule.")

        self.check_parent_traversal(request.path, request.identity.fp_cert)
        # if "/../" in request.path or request.path.endswith("/..") or request.path.startswith("../"):
        #     # TODO: fail2ban whoever tries to do this
        #     print("[gmEditor] WARNING: "+request.identity.fp_cert+" just tried to do parent traversal!")
        #     return (59, "Parent traversal is not allowed. You have been reported.")

        # TODO: handle copy and move tokens here
        if request.content_token in ["about", "help"]:
            return (20, "text/plain", docs)

        elif not request.content_token:
            if not request.content:
                return (51, "No data was submitted for upload. To delete this file, use the delete token.")
            try:
                path = get_path()
                if path.is_dir():
                    path = os.path.join(path, "index.gmi")
                    if not os.path.exists(os.path.dirname(path)):
                        os.makedirs(os.path.dirname(path), exist_ok=True)
                elif not os.path.exists(path) and request.path.endswith("/"):
                    # if the path ends in a / and it doesn't already exist,
                    # create an index.gmi file
                    path = os.path.join(path, "index.gmi")
                    if not os.path.exists(os.path.dirname(path)):
                        os.makedirs(path, exist_ok=True)
                elif not os.path.exists(os.path.dirname(path)):
                    os.makedirs(os.path.dirname(path))
                file = open(path, mode='wb')
                file.write(request.content)
                file.close()
                return (30, "gemini://"+request.hostname+request.path)
            except FileNotFoundError:
                return (51, "Could not find file to edit (this shouldn't happen!)")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when editing file")
            except Exception as e:
                print(e)
                return (42, "Failed to edit file: "+str(e))

        elif request.content_token in ["cp", "copy"]:
            return (59, "Path is required. Add it after the token.")
        elif request.content_token.startswith(("cp ", "copy ")):
            destination = request.content_token[3:] if request.content_token.startswith("cp ") else request.content_token[5:]
            self.check_parent_traversal(destination, request.identity.fp_cert)
            source_filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
            if request.content: # let empty content bypass this check
                self.check_file_unchanged(source_filepath, request.content)
            destination_absolute = pathlib.PurePath(request.path).joinpath(destination)
            if os.path.isfile(source_filepath):
                destination_absolute = pathlib.PurePath(os.path.dirname(request.path)).joinpath(destination)
            destination_filepath = str(self.context.cfg.root_dir() / request.hostname) + str(destination_absolute)
            if not os.path.exists(os.path.dirname(destination_filepath)):
                os.makedirs(os.path.dirname(destination_filepath))
            if not os.path.exists(source_filepath):
                return (51, "Source file does not exist.")
            if os.path.exists(destination_filepath):
                return (59, "Destination file already exists; not overwriting it.")
            try:
                shutil.copy(source_filepath, destination_filepath, follow_symlinks = False)
                return (30, f"titan://{request.hostname}{destination_absolute};token=")
            except FileNotFoundError as e:
                print(e)
                return (51, "Could not find source file to copy.")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when attempting to write new file")
            except Exception as e:
                print(e)
                return (42, "Failed to copy file: "+str(e))

        elif request.content_token == "import":
            return (59, "Path is required. Add it after the token.")
        elif request.content_token.startswith("import "):
            template_path = request.content_token[7:]
            self.check_parent_traversal(template_path, request.identity.fp_cert)
            if request.content:
                return (59, "To confirm overwriting this file and conserve bandwidth, the content must be empty.")
            template_absolute = pathlib.PurePath(request.path).joinpath(template_path)
            destination_filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
            destination_absolute = request.path
            # if os.path.isfile(template_filepath):
            #     template_absolute = pathlib.PurePath(os.path.dirname(request.path)).joinpath(destination)
            template_filepath = str(self.context.cfg.root_dir() / request.hostname) + str(template_absolute)
            if not os.path.exists(os.path.dirname(destination_filepath)):
                os.makedirs(os.path.dirname(destination_filepath))
            if not os.path.exists(template_filepath):
                return (51, "Source file does not exist.")
            try:
                shutil.copy(template_filepath, destination_filepath, follow_symlinks = False)
                return (30, f"titan://{request.hostname}{destination_absolute};token=")
            except FileNotFoundError as e:
                print(e)
                return (51, "Could not find source file to import.")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when attempting to write new file")
            except Exception as e:
                print(e)
                return (42, "Failed to import file: "+str(e))

        elif request.content_token in ["move", "mv", "rename"]:
            return (59, "Path (new name) is required. Add it after the token.")
        elif request.content_token.startswith(("move ", "mv ", "rename ")):
            to_path = request.content_token[5:] if request.content_token.startswith("move ") else request.content_token[3:] if request.content_token.startswith("mv ") else request.content_token[7:]
            self.check_parent_traversal(to_path, request.identity.fp_cert)
            to_absolute = os.path.join(request.path, to_path)
            to_filepath = str(self.context.cfg.root_dir() / request.hostname) + to_absolute
            from_path = request.path # Is absolute.
            from_filepath = str(self.context.cfg.root_dir() / request.hostname) + from_path
            if os.path.isfile(from_filepath):
                to_absolute = os.path.join(os.path.dirname(request.path), to_path)
                to_filepath = str(self.context.cfg.root_dir() / request.hostname) + to_absolute
            if request.content: # let empty content bypass this check
                self.check_file_unchanged(from_filepath, request.content)
            if not os.path.exists(os.path.dirname(to_filepath)):
                os.makedirs(os.path.dirname(to_filepath))
            if not os.path.exists(from_filepath):
                return (51, "Source file does not exist.")
            try:
                shutil.move(from_filepath, to_filepath)
                return (30, f"titan://{request.hostname}{to_absolute};token=")
            except FileNotFoundError as e:
                print(e)
                return (51, "Could not find source file to move.")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when attempting to write new file")
            except Exception as e:
                print(e)
                return (42, "Failed to move file: "+str(e))

        elif request.content_token in ["backup", "bak"] or request.content_token.startswith(("backup ", "bak ")):
            import datetime
            backup_path = request.content_token[7:] if request.content_token.startswith("backup ") else request.content_token[4:] if request.content_token.startswith("bak ") else ""
            content_path = request.path # Is absolute.
            content_filepath = str(self.context.cfg.root_dir() / request.hostname) + content_path
            if request.content_token in ["backup", "bak"]:
                rpath = request.path
                if rpath.endswith("/"):
                    rpath = rpath[:-1]
                if os.path.isdir(content_filepath) and request.content:
                    rpath = os.path.join(rpath, "index.gmi")
                backup_path = rpath+f"-{int(datetime.datetime.now().timestamp()*1000)}.bak"
            else:
                self.check_parent_traversal(backup_path, request.identity.fp_cert)
            if os.path.isdir(content_filepath) and request.content:
                content_filepath = os.path.join(content_filepath, "index.gmi")
            backup_absolute = ""
            if os.path.isfile(content_filepath):
                backup_absolute = os.path.join(os.path.dirname(request.path), backup_path)
            else:
                backup_absolute = os.path.join(request.path, backup_path)
            backup_filepath = str(self.context.cfg.root_dir() / request.hostname) + backup_absolute
            if not os.path.exists(content_filepath):
                return (51, "File to backup and edit does not exist.")
            # Create backup
            if not os.path.exists(os.path.dirname(backup_filepath)):
                os.makedirs(os.path.dirname(backup_filepath))
            if os.path.exists(backup_filepath):
                return (59, "Backup destination file already exists; not overwriting it. No edits were made.")
            try:
                shutil.copy(content_filepath, backup_filepath, follow_symlinks = False)
            except FileNotFoundError as e:
                print(e)
                return (51, "Could not find source file to copy.")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when attempting to write new file")
            except Exception as e:
                print(e)
                return (42, "Failed to copy file: "+str(e))
            # Perform edit
            if not request.content:
                # If there are no edits to perform, attempt to edit the backup
                return (30, f"titan://{request.hostname}{backup_absolute};token=")
            if os.path.isdir(content_filepath):
                path = os.path.join(content_filepath, "index.gmi")
                if not os.path.exists(content_filepath):
                    os.makedirs(content_filepath, exist_ok=True)
            elif not os.path.exists(os.path.dirname(content_filepath)):
                os.makedirs(os.path.dirname(content_filepath))
            try:
                file = open(content_filepath, mode='wb')
                file.write(request.content)
                file.close()
                return (30, "gemini://"+request.hostname+request.path)
            except FileNotFoundError:
                return (51, "Could not find file to edit (this shouldn't happen!)")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when editing file")
            except Exception as e:
                print(e)
                return (42, "Failed to edit file: "+str(e))



        elif request.content_token == "delete":
            return (59, "Path is required. Add it after the token.")
        elif request.content_token.startswith("delete "):
            if not request.content_token[7:] == request.path:
                print(request.content_token[7:])
                print(request.path)
                return (50, "To confirm deletion, add the path to the end of the token. It must match exactly.")
            try:
                if os.path.isdir(get_path()):
                    os.rmdir(get_path())
                else:
                    os.remove(get_path())
                return (52, "Deleted successfully.")
            except FileNotFoundError:
                return (51, "Could not find file to delete.")
            except OSError:
                return (59, "The directory must be empty to delete it.")
            except PermissionError as e:
                print(e)
                return (42, "Permission denied when attempting to delete file")
            except Exception as e:
                print(e)
                return (42, "Failed to delete file: "+str(e))

        elif request.content_token in ["els", "ls"]:
            use_titan_links = request.content_token == "els"
            filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
            dirname = filepath if os.path.isdir(filepath) else os.path.dirname(filepath)
            basename = request.path if os.path.isdir(filepath) else os.path.dirname(request.path)
            files = os.scandir(dirname)
            files = sorted(files, key=lambda file: str.casefold(file.name))
            buffer = "# Directory listing for %s\r\n=> %s ..\r\n\r\n" % (os.path.basename(basename[:-1]), f"{'titan' if use_titan_links else 'gemini'}://{request.hostname}{os.path.dirname(basename[:-1])}")
            if basename == "/":
                buffer = f"# Directory listing for root of {request.hostname}\r\n\r\n"
            for file in files:
                buffer = buffer + f"=> {'titan' if use_titan_links else 'gemini'}://{request.hostname}{basename if basename != '/' else ''}/{file.name}{'/' if file.is_dir() else ''} {file.name}{'/' if file.is_dir() else ''}\r\n"
            return (20, "text/gemini", buffer)

        else:
            return (59, "Invalid token. Use the token 'help' with an empty body to see all valid tokens.")
        return (50, "A critical error occurred! The action token did not give a response.")

def init(context):
    if not "uploads" in context.cfg.ini:
        print("  Upload module global configuration missing.")
    config = context.cfg.ini["uploads"] if "uploads" in context.cfg.ini else {}
    try:
        # Accesses all of these to make sure that they're available
        config
        try:
            config['certs']
        except:
            try:
                context.cfg.prefixed_sections("uploads:")
            except:
                print("  Upload module configuration missing! Not initializing, so your files stay safe.")
                return
    except:
        print("  Upload module configuration failed to load! Not initializing, so your files stay safe.")
        return
    allowed_keys = config['certs'] if "certs" in config else []
    configured_hosts = context.cfg.prefixed_sections("uploads:")
    if len(configured_hosts) > 0:
        # There exist per-host upload configs!
        for host, lconfig in configured_hosts.items():
            try:
                context.add("/*", UploadHandler(context, host, lconfig, allowed_keys).upload, hostname=host, protocol="titan")
                print("  Adding upload handler to "+host)
            except Exception as e:
                print("  Failed to add handler to "+host+"! "+e)
    else:
        try:
            context.add("/*", UploadHandler(context, global_allowed_keys=allowed_keys).upload, protocol="titan")
            print("  Adding upload handler to all hosts!")
        except Exception as e:
            print("  Failed to add global handler! "+e)