going-flying.com gemini git repository
39a5e343eb11637d1d2feecf363dcb2fe735cac7 - Matthew Ernisse - 1595971185
test perhaps providing an interface to the site itself.
diff --git a/cgi-bin/git b/cgi-bin/git new file mode 100755 index 0000000..9303a27 --- /dev/null +++ b/cgi-bin/git @@ -0,0 +1,7 @@ +#!/usr/bin/env python3 + +# allow importing from within the place. +import os +import sys +sus.path.append(os.path.join(os.path.realpath(__file__), 'git-gmi')) +import gateway diff --git a/cgi-bin/git-gmi/config.py b/cgi-bin/git-gmi/config.py new file mode 100644 index 0000000..9b1fd1e --- /dev/null +++ b/cgi-bin/git-gmi/config.py @@ -0,0 +1,12 @@ +# where on the disk are the repos located +GIT_CATALOG = "/var/gemini" +# which path leads to your cgi app after the URL's host part +CGI_PATH = "/cgi-bin/" +# cache dir +CACHE_DIR = "" +# how long before cache expires, in seconds: int +CACHE_TTL = 120 +# your site's display name +GIT_GMI_SITE_TITLE = "going-flying.com" +# the "main" branch that git.gmi defaults to +MAIN_BRANCH = "master" diff --git a/cgi-bin/git-gmi/const.py b/cgi-bin/git-gmi/const.py new file mode 100644 index 0000000..f1f757f --- /dev/null +++ b/cgi-bin/git-gmi/const.py @@ -0,0 +1,6 @@ +STATUS_SUCCESS = "20" +STATUS_NOT_FOUND = "51 NOT FOUND" +STATUS_TEMPORARY_FAILURE = "40 TEMPORARY FAILURE" +META_GEMINI = "text/gemini" +META_PLAINTEXT = "text/plain" +MAX_DISPLAYED_BLOB_SIZE = 500 * 1024 # 500KB diff --git a/cgi-bin/git-gmi/gateway.py b/cgi-bin/git-gmi/gateway.py new file mode 100644 index 0000000..b0c9b0a --- /dev/null +++ b/cgi-bin/git-gmi/gateway.py @@ -0,0 +1,96 @@ +from git import * +from const import * +from config import * +from os import environ, listdir +import sys + +# be careful when using print(); stdout is passed to the client. +# this cgi uses \n as newline. + + +def handle_cgi_request(path: str, query: str): + # intended to work with Jetforce. + # hypothetical example: + # url: gemini://git.gemini.site/git/cgi/repo/src/static/css/[index.css] + # path: /repo/src/static/css/[index.css] + # path_trace = ['repo', 'src', 'static', 'css', 'index.css'] + path_trace = path[1:].split("/") + if path_trace == [""]: # empty path + print(f"{STATUS_SUCCESS} {META_GEMINI}") # welcome page + print(f"# Welcome to {GIT_GMI_SITE_TITLE}") + print("## Available repositories:") + print("\n".join([f"=> {dir}/" for dir in listdir(GIT_CATALOG)])) + return + + try: + repo = GitGmiRepo(path_trace[0], f"{GIT_CATALOG}/{path_trace[0]}") + except FileNotFoundError: + print(STATUS_NOT_FOUND) + return + + if len(path_trace) > 1: + view = path_trace[1] # e.g. summary, tree, log + else: + # gemini://git.gemini.site/git/cgi/<repo>/ + print("31 summary") + return + + if view == "summary": + try: + print(repo.view_summary()) + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "tree": + if len(path_trace) == 2: + # gemini://git.gemini.site/git/cgi/<repo>/tree/ + print(f"31 {MAIN_BRANCH}/") + + elif len(path_trace) > 2: + # gemini://git.gemini.site/git/cgi/<repo>/tree/<branch>/ + branch = path_trace[2] + + location = path_trace[3:] + + try: # is dir + print(repo.view_tree(branch, location)) + except FileNotFoundError: # is file + try: + if query == "raw": + sys.stdout.buffer.write(repo.view_raw_blob(branch, location)) + else: + print(repo.view_blob(branch, location)) + except FileNotFoundError: + print(STATUS_NOT_FOUND) + + elif view == "log": + try: + print(repo.view_log()) + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "commit": + try: + commit_str = path_trace[2] + except IndexError: + print("50 No commit id given") + return + + try: + if query == "raw": + print(repo.view_raw_commit(commit_str)) + else: + print(repo.view_commit(commit_str)) + except FileNotFoundError: + print("50 No such commit") + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "refs": + try: + print(repo.view_refs()) + except: + print(STATUS_TEMPORARY_FAILURE) + + +handle_cgi_request(environ.get("PATH_INFO"), environ.get("QUERY_STRING")) diff --git a/cgi-bin/git-gmi/git.py b/cgi-bin/git-gmi/git.py new file mode 100644 index 0000000..b758440 --- /dev/null +++ b/cgi-bin/git-gmi/git.py @@ -0,0 +1,371 @@ +from pygit2 import * +from hurry.filesize import size, alternative +from datetime import datetime, timedelta +import dateutil.parser +from pathlib import Path +import os +import shutil +import mimetypes +from const import * +from config import * +from utils import * + +mimetypes.add_type("text/gemini", ".gmi") +mimetypes.add_type("text/gemini", ".gemini") + + +def convert_filesize(bytes: int) -> str: + # convert filesize in bytes to a human-friendly format + return size(bytes, system=alternative) + + +class GitGmiRepo: + def __init__(self, name: str, path: str): + self.name = name + self.path = path + self.cache_dir = Path(CACHE_DIR) / name + self._init_cache() + try: + self.repo = Repository(path) + except GitError: + raise FileNotFoundError(f"Error: no such repo: {name}") + + def _init_cache(self): + try: + os.mkdir(self.cache_dir) + except FileExistsError: + pass + + def _read_cache(self, req: list) -> str: + # req is what the user requests after the repo name, + # like ["tree", "master", "src"] + # which points to a file called tree_master_src.gmi + # file content: + # 20 text/gemini + # [body - page content] + # [newline] + # cached at: + # [iso timestamp] + fn = "_".join(req) + ".gmi" + try: + with open(self.cache_dir / fn) as f: + response = f.read() + f.close() + created_at = dateutil.parser.isoparse(response.splitlines()[-1]) + if datetime.now() - created_at < timedelta(seconds=CACHE_TTL): + # cache valid + # response will include the timestamp + return response + except FileNotFoundError: + pass + + return None + + def _write_cache(self, req: list, resp: str): + # write resp into cache, appended with timestamp + fn = "_".join(req) + ".gmi" + try: + f = open(self.cache_dir / fn, "x") + except FileExistsError: + f = open(self.cache_dir / fn, "w") + f.write(resp + "\ncached at:\n" + datetime.now().isoformat()) + + def _flush_cache(self): + try: + shutil.rmtree(self.cache_dir) + except FileNotFoundError: + pass + + def _generate_header(self): + # global "header" to display above all views (except raw files) + header = ( + f"# {self.name}\n" + f"=> {CGI_PATH} {GIT_GMI_SITE_TITLE}\n" + f"=> {CGI_PATH}{self.name}/summary summary\n" + f"=> {CGI_PATH}{self.name}/tree/{MAIN_BRANCH}/ tree\n" + f"=> {CGI_PATH}{self.name}/log log\n" + f"=> {CGI_PATH}{self.name}/refs refs\n\n" + ) + return header + + def view_summary(self) -> str: + cached = self._read_cache(["summary"]) + if cached is not None: + return cached + + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() + # show 3 recent commits + recent_commits = self._get_commit_log()[:3] + for cmt in recent_commits: + time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" + response += ( + f"### {cmt['short_id']} - {cmt['author']} - {time}\n" + f"{cmt['msg'].splitlines()[0]}\n\n" + ) # TODO: link to commit view + # find and display readme(.*) + tree = self._get_tree(MAIN_BRANCH) + trls = self._list_tree(tree) + found_readme = False + for item in trls: + if ( + item["type"] == "file" + and item["name"].lower().split(".")[0] == ("readme") + and not found_readme + ): + found_readme = True + response += ( + f"## {item['name']} | {convert_filesize(item['size'])}\n" + f"{item['blob'].data.decode('utf-8')}" + ) + if not found_readme: + response += "## No readme found." + + self._write_cache(["summary"], response) + + return response + + def _get_commit_log(self) -> list: + # returns useful info from commit log. + repo = self.repo + commits = list(repo.walk(repo[repo.head.target].id, GIT_SORT_TIME)) + log = [ + { + "id": str(cmt.id), # hex SHA-1 hash + "short_id": str(cmt.short_id), # short version of the above + "author": cmt.author.name, # author's display name + "time": cmt.commit_time, # unix timestamp + "msg": cmt.message, # full commit message + } + for cmt in commits + ] + + return log # reverse chronical order + + def view_log(self) -> str: + cached = self._read_cache(["log"]) + if cached is not None: + return cached + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() + log = self._get_commit_log() + for cmt in log: + # looks like "2020-06-06 04:51:21 UTC" + time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" + response += ( + f"## {cmt['short_id']} - {cmt['author']} - {time}\n" + f"=> commit/{cmt['id']} view diff\n" + f"=> tree/{cmt['id']}/ view tree\n" + f"{cmt['msg']}\n\n" + ) + self._write_cache(["log"], response) + return response + + def _get_commit(self, commit_str) -> dict: + try: + commit = self.repo.revparse_single(commit_str) + diff = self.repo.diff(commit.parents[0], commit) + return { + "id": commit.id, + "author": commit.author.name, + "time": commit.commit_time, + "msg": commit.message, + "patch": diff.patch, + } + except ValueError: + raise FileNotFoundError(f"Error: no such commit: {commit_str}") + + def view_commit(self, commit_str) -> str: + cached = self._read_cache(["commit", commit_str]) + if cached is not None: + return cached + commit = self._get_commit(commit_str) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self._generate_header() + + f"{commit['id']} - {commit['author']} - {commit['time']}\n" + + commit["msg"] + + "\n" + + f"=> {CGI_PATH}{self.name}/tree/{commit['id']}/ view tree\n" + + f"=> {commit_str}?raw view raw\n" + + "\n```\n" + + commit["patch"] + + "\n```" + ) + self._write_cache(["commit", commit_str], response) + return response + + def view_raw_commit(self, commit_str) -> str: + commit = self.get_commit(commit_str) + response = f"{STATUS_SUCCESS} {META_PLAINTEXT}\r\n" + commit["patch"] + return response + + def _get_refs(self) -> list: + refs = self.repo.listall_reference_objects() + return [ + { + "name": ref.name, + "shorthand": ref.shorthand, + "target": ref.target, + "type": ref.type, + } + for ref in refs + ] + + def view_refs(self) -> str: + cached = self._read_cache(["refs"]) + if cached is not None: + return cached + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self._generate_header() + refs = self._get_refs() + for ref in refs: + # HACK: filter out refs with slashes as remote branches + if ref["shorthand"].find("/") == -1: + response += ( + f"## {ref['shorthand']}\n=> tree/{ref['shorthand']}/ view tree\n\n" + ) + self._write_cache(["refs"], response) + return response + + @classmethod + def _parse_recursive_tree(cls, tree: Tree) -> list: + # recursively replace all Trees with a list of Blobs inside it, + # bundled with the Tree's name as a tuple, + # e.g. [('src', [blob0, blob1]), otherblob]. + tree_list = list(tree) + for idx, item in enumerate(tree_list): + if isinstance(item, Tree): + tree_list[idx] = (item.name, cls._parse_recursive_tree(tree_list[idx])) + + return tree_list + + def _get_tree(self, revision_str: str) -> list: + # returns a recursive list of Blob objects + try: + revision = self.repo.revparse_single(revision_str) + if isinstance(revision, Commit): + # top level tree; may contain sub-trees + return self._parse_recursive_tree(revision.tree) + elif isinstance(revision, Tag): + return self._parse_recursive_tree(revision.get_object().tree) + except ValueError: + raise FileNotFoundError(f"Error: no such tree: {revision_str}") + return None + + @staticmethod + def _list_tree(tree_list: list, location=[]) -> list: + # tree_list is the output of _parse_recursive_tree(<tree>); + # location is which dir you are viewing, represented path-like + # in a list, e.g. ['src', 'static', 'css'] => 'src/static/css', + # which this method will cd into and display to the visitor. + # when there is no such dir, raises FileNotFoundError. + trls = tree_list + for loc in location: + found = False + for item in trls: + if isinstance(item, tuple) and item[0] == loc: + trls = item[1] + found = True + break + if not found: + raise FileNotFoundError( + f"Error: no such directory: {'/'.join(location)}" + ) + + contents = [] + for item in trls: + if isinstance(item, tuple): + # was originally a Tree; structure: ('dir_name', [list_of_blobs]) + contents.append( + { + "type": "dir", + "name": item[0], + "items": len(item[1]), # number of objects in dir + } + ) + + elif isinstance(item, Blob): + contents.append( + { + "type": "file", + "name": item.name, + "blob": item, + "size": item.size, # size in bytes + } + ) + + return contents + + def view_tree(self, branch: str, location=[]) -> str: + # actual Gemini response + # consists of a header and a body + cached = self._read_cache(["tree", branch] + location) + if cached is not None: + return cached + + tree = self._get_tree(branch) + contents = self._list_tree(tree, location) + items = len(contents) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self._generate_header() + + f"## {self.name}{'/' if location else ''}{'/'.join(location)}/" + f" | {items} {'items' if items > 1 else 'item'}\n\n" + ) + for item in contents: + if item["type"] == "dir": + response += ( + f"=> {item['name']}/ {item['name']}/ | {item['items']} items\n" + ) + elif item["type"] == "file": + response += f"=> {item['name']} {item['name']} | {convert_filesize(item['size'])}\n" + self._write_cache(["tree", branch] + location, response) + return response + + def _get_blob(self, commit_str: str, location=[]) -> Blob: + # returns a specific Blob object + # location: just like that of _list_tree, but the last element + # is the filename + try: + tree = self._get_tree(commit_str) + trls = self._list_tree(tree, location[:-1]) + for item in trls: + if item["type"] == "file" and item["name"] == location[-1]: + return item["blob"] + raise FileNotFoundError(f"Error: no such file: {'/'.join(location)}") + except FileNotFoundError: + raise FileNotFoundError(f"Error: No such tree: {'/'.join(location[:-1])}") + + def view_blob(self, branch: str, location=[]) -> str: + cached = self._read_cache(["tree", branch] + location) + if cached is not None: + return cached + blob = self._get_blob(branch, location) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self._generate_header() + + f"## {self.name}/{'/'.join(location)} | {convert_filesize(blob.size)}\n\n" + ) + + if blob.is_binary: + response += ( + "This file seems to be binary. Open link below to download.\n" + f"=> {blob.name}?raw download" + ) + elif blob.size < MAX_DISPLAYED_BLOB_SIZE: + response += ( + f"=> {blob.name}?raw view raw\n\n" + "```\n" + add_line_numbers(blob.data.decode("utf-8")) + "\n```" + ) + else: + response += ( + "This file is too large to be displayed. Open link below to download.\n" + f"=> {blob.name}?raw download\n\n" + ) + return response + + def view_raw_blob(self, branch: str, location=[]) -> bytes: + blob = self._get_blob(branch, location) + # if mimetypes can't make out the type, set it to plaintext + guessed_mimetype = mimetypes.guess_type(blob.name)[0] or META_PLAINTEXT + response = bytes(f"{STATUS_SUCCESS} {guessed_mimetype}\r\n", encoding="utf-8") + response += blob.data + return response diff --git a/cgi-bin/git-gmi/index.gmi b/cgi-bin/git-gmi/index.gmi new file mode 100644 index 0000000..49a4cbe --- /dev/null +++ b/cgi-bin/git-gmi/index.gmi @@ -0,0 +1,18 @@ +# This is a git.gmi instance - a frontend for Git on Gemini + +```git.gmi + _ _ _ + (_) | | (_) + __ _ _ | |_ __ _ _ __ ___ _ + / _` | | | | __| / _` | | '_ ` _ \ | | +| (_| | | | | |_ _ | (_| | | | | | | | | | + \__, | |_| \__| (_) \__, | |_| |_| |_| |_| + __/ | __/ | + |___/ |___/ +``` + +=> /cgi/ Repo index + +=> gemini://git.fkfd.me/cgi/git.gmi/ Source code +=> https://git.sr.ht/~fkfd/git.gmi/ Source code (HTTPS) + diff --git a/cgi-bin/git-gmi/utils.py b/cgi-bin/git-gmi/utils.py new file mode 100644 index 0000000..c3dbd59 --- /dev/null +++ b/cgi-bin/git-gmi/utils.py @@ -0,0 +1,17 @@ +import math + + +def add_line_numbers(code: str) -> str: + lines = code.splitlines() + if not lines: + return code # empty anyway + + # cannot use math.ceil() here bc lg100=2 + max_digits = math.floor(math.log10(len(lines))) + 1 + + for n, l in enumerate(lines, 1): + digits_in_n = math.floor(math.log10(n)) + 1 + spaces_before_number = max_digits - digits_in_n + lines[n - 1] = " " * spaces_before_number + str(n) + " " + l + + return "\n".join(lines)