💾 Archived View for blakes.dev › gmEditor › gmEditor.py captured on 2024-08-25 at 00:08:31.
View Raw
More Information
⬅️ Previous capture (2024-07-09)
-=-=-=-=-=-=-
# gmEditor 2.0.2, a Gemini capsule editor tool
# Copyright (C) 2024 Blake Leonard
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
docs = """```
=== DOCUMENTATION ===
___
,---. ,--,--,--. / (_) | o
| .-. || | \__ __| _|_ __ ,_
' '-' '| | | | / / | | | / \_/ |
.`- / `--`--`--' \___/\_/|_/|_/|_/\__/ |_/
`---'
gmEditor: v2.0.2
This GMCapsule extension module allows you to easily and powerfully upload to
Gemini capsules via the Titan companion protocol.
It is configured with the "uploads" key in GMCapsule's config file
(see below).
- If no configuration is specified, the module does not initialize.
- You are not allowed to perform directory traversal. You will be reported and
subject to IP ban or certificate removal if you do this.
- YOU ARE ON YOUR OWN if you want to use this. Support is not guaranteed and
it's more likely that I'll ignore your request if you ask for help. This is
not your fault, but you have been warned.
- Some clients apparently don't take tokens from redirects and override them
if they're in the URL. Be careful about what token is currently active! This
is why gmEditor put in various checks, like the path repeated in the delete
token, the content checks for move and copy, and the accidental overwrite
checks.
To set backed-up Gemtext files to render as Gemtext, add the following line
to the root ".meta" file:
=== CONFIGURATION REFERENCE ===
[uploads]
certs = 23ac...
# WHITESPACE separated public key or certificate fingerprints.
# These 'certs' apply to all hostnames by default.
# REQUIRED unless a per-domain section is configured.
[uploads:example.com]
# Replace example.com with your actual hostname.
# This section is entirely optional.
# If it's excluded, the defaults apply.
certs = 23ac...
# Same as above; but for just this domain, if set.
# NOTE: If 'certs' is unset, it defaults to the 'certs' under [uploads].
allow_global_certs = False
# set to True to allow both this domain's 'certs' and global 'certs'.
exclude = PATH PATH2...
# WHITESPACE separated path globs to exclude, i.e. '*.zip' or 'excluded/'.
=== ACTION TOKEN REFERENCE ===
<empty>
Edit the current file, or create it if necessary. The "current file" is the
one you're trying to create or edit in your client; typically this shows up
after the domain name and port in the URL bar or at the top of your editing
form.
On success: 30 redirect to the current file on Gemini.
about |OR| help
This help page!
20 status code with this page in plain-text. The tick-marks at the top force
Gemtext-only clients to render this page in preformatted plain text.
delete <PATH>
Replace <PATH> with the path to delete, which must match the request PATH.
Include the leading /. This is done as a confirmation step, both to prevent
accidental file deletion and to prevent accidentally deleting the wrong
file, which might happen when a client tries to re-use a token.
If you forget the path or get it wrong, the upload handler will tell you.
The content is not checked for this request. To save bandwidth, leave it
empty.
On success: 52 status code with message "Deleted successfully".
copy <PATH> |OR| cp <PATH>
Replace <PATH> with the destination to copy the file being edited to. If
the leading / is not present, the path is relative to the current file or
directory.
The contents must be empty or identical to the source file. This is to
prevent ambiguity. If you want to edit the current file and copy the
original, use the related "backup" token.
On success: 30 redirect to the new <PATH> on Titan, so you can edit it.
Token will be unset.
move <PATH> |OR| mv <PATH> |OR| rename <PATH>
Replace <PATH> with the new path of this file. If the leading / is not
present, the path is relative to the current file or directory, useful for
renaming.
The contents must be empty or identical to the source file. This is to
prevent ambiguity.
On success: 30 redirect to the new <PATH> on Titan, so you can edit it.
Token will be unset.
backup [PATH]
Copy the current version of the edited file to [PATH], or if it's not
provided, a timestamped backup version, and apply the contents to the
edited file as normal.
If the edited contents are empty, the original file is untouched. If the
edited file is actually a directory, the directory gets backed up!
If the edited contents are not empty, though, and the edited file is a
directory, the index.gmi file gets edited and backed up.
An autogenerated file name consists of the original filename, a hyphen,
the UNIX timestamp at the time of backup, and the suffix ".bak".
On success:
if empty: 30 redirect to the backed-up file on Titan, with the
token unset.
if not empty: 30 redirect to the current file on Gemini.
import <PATH>
Copy the contents of the provided file path to the current file. The
contents must be empty to confirm this action.
Future improvements may allow remote lookups by URI.
On success: 30 redirect to the current file on Gemini.
[e]ls
Lists the contents of the current directory (either the current file or the
directory it's in).
No editing operation is done.
The file or directory must already exist.
On success: 20 status code with a directory listing in Gemtext. The links
are to the Gemini version, unless "els" is used, in which case the Titan
paths will be linked to instead for easy editing.
WARNING: your browser may not look up the contents of the link before
offering the Titan form to you! So this is best paired with action tokens
that don't use the content, like copy, ls, import, or delete.
"""
#import gmcapsule;
import os
import pathlib
import urllib
import shutil
class GeminiError(Exception):
def __init__(self, *args):
self.response = args
def returnGeminiError(inner):
def wrapper(*args, **kwargs):
try:
return inner(*args, **kwargs)
except GeminiError as e:
return e.response
return wrapper
class UploadHandler:
def __init__(self, context, host=None, host_config={}, global_allowed_keys=[]):
self.host = host
self.context = context
self.config = host_config
self.global_allowed_keys = global_allowed_keys
self.local_allowed_keys = []
if self.config["certs"]:
self.local_allowed_keys = self.config["certs"].split()
self.global_fallback_keys_allowed = self.config.getboolean("allow_global_certs", False)
def check_parent_traversal(self, path, fp_cert):
"""
Just call it to run the check.
Will raise a GeminiError if it fails.
Best used with @returnGeminiError.
"""
if "/../" in path or path.endswith("/..") or path.startswith("../"):
# TODO: fail2ban whoever tries to do this
print("[gmEditor] WARNING: "+fp_cert+" just tried to do parent traversal!")
raise GeminiError(59, "Parent traversal is not allowed. You have been reported.")
return None
def check_file_unchanged(self, file, content, can_empty=True):
"""
Call this with the file path and text contents to compare.
Will raise a GeminiError if it fails; best used with @returnGeminiError.
"""
with open(file, 'rb') as source:
# check if the files are the same (by hash)
import hashlib
source_hash = hashlib.sha256()
while True:
data = source.read(4096)
if not data:
break
source_hash.update(data)
content_hash = hashlib.sha256()
content_hash.update(content)
if source_hash.hexdigest() != content_hash.hexdigest():
if can_empty:
raise GeminiError(59, "To avoid confusion, either empty the content (preferred) or do not change it from the source file.")
else:
raise GeminiError(59, "To avoid confusion, do not change the content from the source file.")
@returnGeminiError
def upload(self, request):
if request.content_token:
request.content_token = urllib.parse.unquote(request.content_token)
excludes = self.config.get("exclude", "").split() if "exclude" in self.config else []
# if request.path in excludes:
# # TODO: is this the correct way to cause it to skip the handler?
# raise "Excluded"
if request.scheme != "titan":
return (59, "Protocol must be Titan to edit files.")
for path in excludes:
if pathlib.PurePosixPath(request.path).match(path):
return (59, "Editing here is forbidden for everyone.")
def get_path():
return (self.context.cfg.root_dir() / request.hostname).joinpath(request.path[1:])
if not request.identity:
return (60, "I'm sorry Dave, I'm afraid I can't do that.")
allowed = False
if "certs" in self.config:
for cert in self.config["certs"].split():
if request.identity.fp_cert.lower() == cert.lower():
allowed = True
break
elif request.identity.fp_pubkey.lower() == cert.lower():
allowed = True
break
if allowed == False and not self.global_fallback_keys_allowed:
return (61, "You are not allowed to upload to this capsule.")
for cert in self.global_allowed_keys:
if request.identity.fp_cert.lower() == cert.lower():
allowed = True
break
elif request.identity.fp_pubkey.lower() == cert.lower():
allowed = True
break
if allowed == False:
return (61, "You are not allowed to upload to this capsule.")
self.check_parent_traversal(request.path, request.identity.fp_cert)
# if "/../" in request.path or request.path.endswith("/..") or request.path.startswith("../"):
# # TODO: fail2ban whoever tries to do this
# print("[gmEditor] WARNING: "+request.identity.fp_cert+" just tried to do parent traversal!")
# return (59, "Parent traversal is not allowed. You have been reported.")
# TODO: handle copy and move tokens here
if request.content_token in ["about", "help"]:
return (20, "text/plain", docs)
elif not request.content_token:
if not request.content:
return (51, "No data was submitted for upload. To delete this file, use the delete token.")
try:
path = get_path()
if path.is_dir():
path = os.path.join(path, "index.gmi")
if not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path), exist_ok=True)
elif not os.path.exists(path) and request.path.endswith("/"):
# if the path ends in a / and it doesn't already exist,
# create an index.gmi file
path = os.path.join(path, "index.gmi")
if not os.path.exists(os.path.dirname(path)):
os.makedirs(path, exist_ok=True)
elif not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
file = open(path, mode='wb')
file.write(request.content)
file.close()
return (30, "gemini://"+request.hostname+request.path)
except FileNotFoundError:
return (51, "Could not find file to edit (this shouldn't happen!)")
except PermissionError as e:
print(e)
return (42, "Permission denied when editing file")
except Exception as e:
print(e)
return (42, "Failed to edit file: "+str(e))
elif request.content_token in ["cp", "copy"]:
return (59, "Path is required. Add it after the token.")
elif request.content_token.startswith(("cp ", "copy ")):
destination = request.content_token[3:] if request.content_token.startswith("cp ") else request.content_token[5:]
self.check_parent_traversal(destination, request.identity.fp_cert)
source_filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
if request.content: # let empty content bypass this check
self.check_file_unchanged(source_filepath, request.content)
destination_absolute = pathlib.PurePath(request.path).joinpath(destination)
if os.path.isfile(source_filepath):
destination_absolute = pathlib.PurePath(os.path.dirname(request.path)).joinpath(destination)
destination_filepath = str(self.context.cfg.root_dir() / request.hostname) + str(destination_absolute)
if not os.path.exists(os.path.dirname(destination_filepath)):
os.makedirs(os.path.dirname(destination_filepath))
if not os.path.exists(source_filepath):
return (51, "Source file does not exist.")
if os.path.exists(destination_filepath):
return (59, "Destination file already exists; not overwriting it.")
try:
shutil.copy(source_filepath, destination_filepath, follow_symlinks = False)
return (30, f"titan://{request.hostname}{destination_absolute};token=")
except FileNotFoundError as e:
print(e)
return (51, "Could not find source file to copy.")
except PermissionError as e:
print(e)
return (42, "Permission denied when attempting to write new file")
except Exception as e:
print(e)
return (42, "Failed to copy file: "+str(e))
elif request.content_token == "import":
return (59, "Path is required. Add it after the token.")
elif request.content_token.startswith("import "):
template_path = request.content_token[7:]
self.check_parent_traversal(template_path, request.identity.fp_cert)
if request.content:
return (59, "To confirm overwriting this file and conserve bandwidth, the content must be empty.")
template_absolute = pathlib.PurePath(request.path).joinpath(template_path)
destination_filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
destination_absolute = request.path
# if os.path.isfile(template_filepath):
# template_absolute = pathlib.PurePath(os.path.dirname(request.path)).joinpath(destination)
template_filepath = str(self.context.cfg.root_dir() / request.hostname) + str(template_absolute)
if not os.path.exists(os.path.dirname(destination_filepath)):
os.makedirs(os.path.dirname(destination_filepath))
if not os.path.exists(template_filepath):
return (51, "Source file does not exist.")
try:
shutil.copy(template_filepath, destination_filepath, follow_symlinks = False)
return (30, f"titan://{request.hostname}{destination_absolute};token=")
except FileNotFoundError as e:
print(e)
return (51, "Could not find source file to import.")
except PermissionError as e:
print(e)
return (42, "Permission denied when attempting to write new file")
except Exception as e:
print(e)
return (42, "Failed to import file: "+str(e))
elif request.content_token in ["move", "mv", "rename"]:
return (59, "Path (new name) is required. Add it after the token.")
elif request.content_token.startswith(("move ", "mv ", "rename ")):
to_path = request.content_token[5:] if request.content_token.startswith("move ") else request.content_token[3:] if request.content_token.startswith("mv ") else request.content_token[7:]
self.check_parent_traversal(to_path, request.identity.fp_cert)
to_absolute = os.path.join(request.path, to_path)
to_filepath = str(self.context.cfg.root_dir() / request.hostname) + to_absolute
from_path = request.path # Is absolute.
from_filepath = str(self.context.cfg.root_dir() / request.hostname) + from_path
if os.path.isfile(from_filepath):
to_absolute = os.path.join(os.path.dirname(request.path), to_path)
to_filepath = str(self.context.cfg.root_dir() / request.hostname) + to_absolute
if request.content: # let empty content bypass this check
self.check_file_unchanged(from_filepath, request.content)
if not os.path.exists(os.path.dirname(to_filepath)):
os.makedirs(os.path.dirname(to_filepath))
if not os.path.exists(from_filepath):
return (51, "Source file does not exist.")
try:
shutil.move(from_filepath, to_filepath)
return (30, f"titan://{request.hostname}{to_absolute};token=")
except FileNotFoundError as e:
print(e)
return (51, "Could not find source file to move.")
except PermissionError as e:
print(e)
return (42, "Permission denied when attempting to write new file")
except Exception as e:
print(e)
return (42, "Failed to move file: "+str(e))
elif request.content_token in ["backup", "bak"] or request.content_token.startswith(("backup ", "bak ")):
import datetime
backup_path = request.content_token[7:] if request.content_token.startswith("backup ") else request.content_token[4:] if request.content_token.startswith("bak ") else ""
content_path = request.path # Is absolute.
content_filepath = str(self.context.cfg.root_dir() / request.hostname) + content_path
if request.content_token in ["backup", "bak"]:
rpath = request.path
if rpath.endswith("/"):
rpath = rpath[:-1]
if os.path.isdir(content_filepath) and request.content:
rpath = os.path.join(rpath, "index.gmi")
backup_path = rpath+f"-{int(datetime.datetime.now().timestamp()*1000)}.bak"
else:
self.check_parent_traversal(backup_path, request.identity.fp_cert)
if os.path.isdir(content_filepath) and request.content:
content_filepath = os.path.join(content_filepath, "index.gmi")
backup_absolute = ""
if os.path.isfile(content_filepath):
backup_absolute = os.path.join(os.path.dirname(request.path), backup_path)
else:
backup_absolute = os.path.join(request.path, backup_path)
backup_filepath = str(self.context.cfg.root_dir() / request.hostname) + backup_absolute
if not os.path.exists(content_filepath):
return (51, "File to backup and edit does not exist.")
# Create backup
if not os.path.exists(os.path.dirname(backup_filepath)):
os.makedirs(os.path.dirname(backup_filepath))
if os.path.exists(backup_filepath):
return (59, "Backup destination file already exists; not overwriting it. No edits were made.")
try:
shutil.copy(content_filepath, backup_filepath, follow_symlinks = False)
except FileNotFoundError as e:
print(e)
return (51, "Could not find source file to copy.")
except PermissionError as e:
print(e)
return (42, "Permission denied when attempting to write new file")
except Exception as e:
print(e)
return (42, "Failed to copy file: "+str(e))
# Perform edit
if not request.content:
# If there are no edits to perform, attempt to edit the backup
return (30, f"titan://{request.hostname}{backup_absolute};token=")
if os.path.isdir(content_filepath):
path = os.path.join(content_filepath, "index.gmi")
if not os.path.exists(content_filepath):
os.makedirs(content_filepath, exist_ok=True)
elif not os.path.exists(os.path.dirname(content_filepath)):
os.makedirs(os.path.dirname(content_filepath))
try:
file = open(content_filepath, mode='wb')
file.write(request.content)
file.close()
return (30, "gemini://"+request.hostname+request.path)
except FileNotFoundError:
return (51, "Could not find file to edit (this shouldn't happen!)")
except PermissionError as e:
print(e)
return (42, "Permission denied when editing file")
except Exception as e:
print(e)
return (42, "Failed to edit file: "+str(e))
elif request.content_token == "delete":
return (59, "Path is required. Add it after the token.")
elif request.content_token.startswith("delete "):
if not request.content_token[7:] == request.path:
print(request.content_token[7:])
print(request.path)
return (50, "To confirm deletion, add the path to the end of the token. It must match exactly.")
try:
if os.path.isdir(get_path()):
os.rmdir(get_path())
else:
os.remove(get_path())
return (52, "Deleted successfully.")
except FileNotFoundError:
return (51, "Could not find file to delete.")
except OSError:
return (59, "The directory must be empty to delete it.")
except PermissionError as e:
print(e)
return (42, "Permission denied when attempting to delete file")
except Exception as e:
print(e)
return (42, "Failed to delete file: "+str(e))
elif request.content_token in ["els", "ls"]:
use_titan_links = request.content_token == "els"
filepath = str(self.context.cfg.root_dir() / request.hostname) + request.path
dirname = filepath if os.path.isdir(filepath) else os.path.dirname(filepath)
basename = request.path if os.path.isdir(filepath) else os.path.dirname(request.path)
files = os.scandir(dirname)
files = sorted(files, key=lambda file: str.casefold(file.name))
buffer = "# Directory listing for %s\r\n=> %s ..\r\n\r\n" % (os.path.basename(basename[:-1]), f"{'titan' if use_titan_links else 'gemini'}://{request.hostname}{os.path.dirname(basename[:-1])}")
if basename == "/":
buffer = f"# Directory listing for root of {request.hostname}\r\n\r\n"
for file in files:
buffer = buffer + f"=> {'titan' if use_titan_links else 'gemini'}://{request.hostname}{basename if basename != '/' else ''}/{file.name}{'/' if file.is_dir() else ''} {file.name}{'/' if file.is_dir() else ''}\r\n"
return (20, "text/gemini", buffer)
else:
return (59, "Invalid token. Use the token 'help' with an empty body to see all valid tokens.")
return (50, "A critical error occurred! The action token did not give a response.")
def init(context):
if not "uploads" in context.cfg.ini:
print(" Upload module global configuration missing.")
config = context.cfg.ini["uploads"] if "uploads" in context.cfg.ini else {}
try:
# Accesses all of these to make sure that they're available
config
try:
config['certs']
except:
try:
context.cfg.prefixed_sections("uploads:")
except:
print(" Upload module configuration missing! Not initializing, so your files stay safe.")
return
except:
print(" Upload module configuration failed to load! Not initializing, so your files stay safe.")
return
allowed_keys = config['certs'] if "certs" in config else []
configured_hosts = context.cfg.prefixed_sections("uploads:")
if len(configured_hosts) > 0:
# There exist per-host upload configs!
for host, lconfig in configured_hosts.items():
try:
context.add("/*", UploadHandler(context, host, lconfig, allowed_keys).upload, hostname=host, protocol="titan")
print(" Adding upload handler to "+host)
except Exception as e:
print(" Failed to add handler to "+host+"! "+e)
else:
try:
context.add("/*", UploadHandler(context, global_allowed_keys=allowed_keys).upload, protocol="titan")
print(" Adding upload handler to all hosts!")
except Exception as e:
print(" Failed to add global handler! "+e)