going-flying.com gemini git repository
b0a93b413e8d9bfaf134ad0fbafe4f41d69d7b7e - Matthew Ernisse - 1612474200
feed update and start trying to see if git.gmi will work
diff --git a/capcom/feeds.txt b/capcom/feeds.txt index 1399667..06232a1 100644 --- a/capcom/feeds.txt +++ b/capcom/feeds.txt @@ -86,3 +86,5 @@ gemini://gemini.sensorstation.co/atom.xml gemini://gemini.ctrl-c.club/~nehrman/gemlog/atom.xml gemini://gemini.trans-neptunian.space/~smog/atom.xml gemini://freeside.wntrmute.net/log/index.rss +gemini://gemini.trans-neptunian.space/~littlejohn/atom.xml + diff --git a/cgi-bin/v0.4.1.tar.gz b/cgi-bin/v0.4.1.tar.gz deleted file mode 100644 index 9403323..0000000 Binary files a/cgi-bin/v0.4.1.tar.gz and /dev/null differ diff --git a/git/cgi b/git/cgi new file mode 100755 index 0000000..8777f65 --- /dev/null +++ b/git/cgi @@ -0,0 +1,3 @@ +#!usr/bin/env python3 +# gotta change the executable path before running +import gateway diff --git a/git/config.py b/git/config.py new file mode 100644 index 0000000..fbffc9a --- /dev/null +++ b/git/config.py @@ -0,0 +1,8 @@ +# where on the disk are the repos located +GIT_CATALOG = "/srv/git/" +# which path leads to your cgi app after the URL's host part +CGI_PATH = "/git/cgi/" +# your site's display name +GIT_GMI_SITE_TITLE = "going-flying.com git repository" +# the "main" branch that git.gmi defaults to +MAIN_BRANCH = "master" diff --git a/git/const.py b/git/const.py new file mode 100644 index 0000000..f1f757f --- /dev/null +++ b/git/const.py @@ -0,0 +1,6 @@ +STATUS_SUCCESS = "20" +STATUS_NOT_FOUND = "51 NOT FOUND" +STATUS_TEMPORARY_FAILURE = "40 TEMPORARY FAILURE" +META_GEMINI = "text/gemini" +META_PLAINTEXT = "text/plain" +MAX_DISPLAYED_BLOB_SIZE = 500 * 1024 # 500KB diff --git a/git/gateway.py b/git/gateway.py new file mode 100644 index 0000000..b0c9b0a --- /dev/null +++ b/git/gateway.py @@ -0,0 +1,96 @@ +from git import * +from const import * +from config import * +from os import environ, listdir +import sys + +# be careful when using print(); stdout is passed to the client. +# this cgi uses \n as newline. + + +def handle_cgi_request(path: str, query: str): + # intended to work with Jetforce. + # hypothetical example: + # url: gemini://git.gemini.site/git/cgi/repo/src/static/css/[index.css] + # path: /repo/src/static/css/[index.css] + # path_trace = ['repo', 'src', 'static', 'css', 'index.css'] + path_trace = path[1:].split("/") + if path_trace == [""]: # empty path + print(f"{STATUS_SUCCESS} {META_GEMINI}") # welcome page + print(f"# Welcome to {GIT_GMI_SITE_TITLE}") + print("## Available repositories:") + print("\n".join([f"=> {dir}/" for dir in listdir(GIT_CATALOG)])) + return + + try: + repo = GitGmiRepo(path_trace[0], f"{GIT_CATALOG}/{path_trace[0]}") + except FileNotFoundError: + print(STATUS_NOT_FOUND) + return + + if len(path_trace) > 1: + view = path_trace[1] # e.g. summary, tree, log + else: + # gemini://git.gemini.site/git/cgi/<repo>/ + print("31 summary") + return + + if view == "summary": + try: + print(repo.view_summary()) + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "tree": + if len(path_trace) == 2: + # gemini://git.gemini.site/git/cgi/<repo>/tree/ + print(f"31 {MAIN_BRANCH}/") + + elif len(path_trace) > 2: + # gemini://git.gemini.site/git/cgi/<repo>/tree/<branch>/ + branch = path_trace[2] + + location = path_trace[3:] + + try: # is dir + print(repo.view_tree(branch, location)) + except FileNotFoundError: # is file + try: + if query == "raw": + sys.stdout.buffer.write(repo.view_raw_blob(branch, location)) + else: + print(repo.view_blob(branch, location)) + except FileNotFoundError: + print(STATUS_NOT_FOUND) + + elif view == "log": + try: + print(repo.view_log()) + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "commit": + try: + commit_str = path_trace[2] + except IndexError: + print("50 No commit id given") + return + + try: + if query == "raw": + print(repo.view_raw_commit(commit_str)) + else: + print(repo.view_commit(commit_str)) + except FileNotFoundError: + print("50 No such commit") + except: + print(STATUS_TEMPORARY_FAILURE) + + elif view == "refs": + try: + print(repo.view_refs()) + except: + print(STATUS_TEMPORARY_FAILURE) + + +handle_cgi_request(environ.get("PATH_INFO"), environ.get("QUERY_STRING")) diff --git a/git/git.py b/git/git.py new file mode 100644 index 0000000..f249ec5 --- /dev/null +++ b/git/git.py @@ -0,0 +1,291 @@ +from pygit2 import * +from hurry.filesize import size, alternative +from datetime import datetime +import mimetypes +from const import * +from config import * + +mimetypes.add_type("text/gemini", ".gmi") +mimetypes.add_type("text/gemini", ".gemini") + + +def convert_filesize(bytes: int) -> str: + # convert filesize in bytes to a human-friendly format + return size(bytes, system=alternative) + + +class GitGmiRepo: + def __init__(self, name: str, path: str): + self.name = name + self.path = path + try: + self.repo = Repository(path) + except GitError: + raise FileNotFoundError(f"Error: no such repo: {name}") + + def generate_header(self): + # global "header" to display above all views (except raw files) + header = ( + f"# {self.name}\n" + f"=> {CGI_PATH} {GIT_GMI_SITE_TITLE}\n" + f"=> {CGI_PATH}{self.name}/summary summary\n" + f"=> {CGI_PATH}{self.name}/tree/{MAIN_BRANCH}/ tree\n" + f"=> {CGI_PATH}{self.name}/log log\n" + f"=> {CGI_PATH}{self.name}/refs refs\n\n" + ) + return header + + def view_summary(self) -> str: + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self.generate_header() + # show 3 recent commits + recent_commits = self.get_commit_log()[:3] + for cmt in recent_commits: + time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" + response += ( + f"### {cmt['short_id']} - {cmt['author']} - {time}\n" + f"{cmt['msg'].splitlines()[0]}\n\n" + ) # TODO: link to commit view + # find and display readme(.*) + tree = self.get_tree(MAIN_BRANCH) + trls = self.list_tree(tree) + found_readme = False + for item in trls: + if ( + item["type"] == "file" + and item["name"].lower().split(".")[0] == ("readme") + and not found_readme + ): + found_readme = True + response += ( + f"## {item['name']} | {convert_filesize(item['size'])}\n" + f"{item['blob'].data.decode('utf-8')}" + ) + if not found_readme: + response += "## No readme found." + return response + + def get_commit_log(self) -> list: + # returns useful info from commit log. + repo = self.repo + commits = list(repo.walk(repo[repo.head.target].id, GIT_SORT_TIME)) + log = [ + { + "id": str(cmt.id), # hex SHA-1 hash + "short_id": str(cmt.short_id), # short version of the above + "author": cmt.author.name, # author's display name + "time": cmt.commit_time, # unix timestamp + "msg": cmt.message, # full commit message + } + for cmt in commits + ] + + return log # reverse chronical order + + def view_log(self) -> str: + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self.generate_header() + log = self.get_commit_log() + for cmt in log: + # looks like "2020-06-06 04:51:21 UTC" + time = str(datetime.utcfromtimestamp(cmt["time"])) + " UTC" + response += ( + f"## {cmt['short_id']} - {cmt['author']} - {time}\n" + f"=> commit/{cmt['id']} view diff\n" + f"=> tree/{cmt['id']}/ view tree\n" + f"{cmt['msg']}\n\n" + ) + return response + + def get_commit(self, commit_str) -> dict: + try: + commit = self.repo.revparse_single(commit_str) + diff = self.repo.diff(commit.parents[0], commit) + return { + "id": commit.id, + "author": commit.author.name, + "time": commit.commit_time, + "msg": commit.message, + "patch": diff.patch, + } + except ValueError: + raise FileNotFoundError(f"Error: no such commit: {commit_str}") + + def view_commit(self, commit_str) -> str: + commit = self.get_commit(commit_str) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self.generate_header() + + f"{commit['id']} - {commit['author']} - {commit['time']}\n" + + commit["msg"] + + "\n" + + f"=> {CGI_PATH}{self.name}/tree/{commit['id']}/ view tree\n" + + f"=> {commit_str}?raw view raw\n" + + "\n```\n" + + commit["patch"] + + "\n```" + ) + return response + + def view_raw_commit(self, commit_str) -> str: + commit = self.get_commit(commit_str) + response = f"{STATUS_SUCCESS} {META_PLAINTEXT}\r\n" + commit["patch"] + return response + + def get_refs(self) -> list: + refs = self.repo.listall_reference_objects() + return [ + { + "name": ref.name, + "shorthand": ref.shorthand, + "target": ref.target, + "type": ref.type, + } + for ref in refs + ] + + def view_refs(self) -> str: + response = f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + self.generate_header() + refs = self.get_refs() + for ref in refs: + # HACK: filter out refs with slashes as remote branches + if ref["shorthand"].find("/") == -1: + response += ( + f"## {ref['shorthand']}\n=> tree/{ref['shorthand']}/ view tree\n\n" + ) + return response + + @classmethod + def parse_recursive_tree(cls, tree: Tree) -> list: + # recursively replace all Trees with a list of Blobs inside it, + # bundled with the Tree's name as a tuple, + # e.g. [('src', [blob0, blob1]), otherblob]. + tree_list = list(tree) + for idx, item in enumerate(tree_list): + if isinstance(item, Tree): + tree_list[idx] = (item.name, cls.parse_recursive_tree(tree_list[idx])) + + return tree_list + + def get_tree(self, revision_str: str) -> list: + # returns a recursive list of Blob objects + try: + revision = self.repo.revparse_single(revision_str) + if isinstance(revision, Commit): + # top level tree; may contain sub-trees + return self.parse_recursive_tree(revision.tree) + elif isinstance(revision, Tag): + return self.parse_recursive_tree(revision.get_object().tree) + except ValueError: + raise FileNotFoundError(f"Error: no such tree: {revision_str}") + return None + + @staticmethod + def list_tree(tree_list: list, location=[]) -> list: + # tree_list is the output of parse_recursive_tree(<tree>); + # location is which dir you are viewing, represented path-like + # in a list, e.g. ['src', 'static', 'css'] => 'src/static/css', + # which this method will cd into and display to the visitor. + # when there is no such dir, raises FileNotFoundError. + trls = tree_list + for loc in location: + found = False + for item in trls: + if isinstance(item, tuple) and item[0] == loc: + trls = item[1] + found = True + break + if not found: + raise FileNotFoundError( + f"Error: no such directory: {'/'.join(location)}" + ) + + contents = [] + for item in trls: + if isinstance(item, tuple): + # was originally a Tree; structure: ('dir_name', [list_of_blobs]) + contents.append( + { + "type": "dir", + "name": item[0], + "items": len(item[1]), # number of objects in dir + } + ) + + elif isinstance(item, Blob): + contents.append( + { + "type": "file", + "name": item.name, + "blob": item, + "size": item.size, # size in bytes + } + ) + + return contents + + def view_tree(self, branch: str, location=[]) -> str: + # actual Gemini response + # consists of a header and a body + tree = self.get_tree(branch) + contents = self.list_tree(tree, location) + items = len(contents) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self.generate_header() + + f"## {self.name}{'/' if location else ''}{'/'.join(location)}/" + f" | {items} {'items' if items > 1 else 'item'}\n\n" + ) + for item in contents: + if item["type"] == "dir": + response += ( + f"=> {item['name']}/ {item['name']}/ | {item['items']} items\n" + ) + elif item["type"] == "file": + response += f"=> {item['name']} {item['name']} | {convert_filesize(item['size'])}\n" + return response + + def get_blob(self, commit_str: str, location=[]) -> Blob: + # returns a specific Blob object + # location: just like that of list_tree, but the last element + # is the filename + try: + tree = self.get_tree(commit_str) + trls = self.list_tree(tree, location[:-1]) + for item in trls: + if item["type"] == "file" and item["name"] == location[-1]: + return item["blob"] + raise FileNotFoundError(f"Error: no such file: {'/'.join(location)}") + except FileNotFoundError: + raise FileNotFoundError(f"Error: No such tree: {'/'.join(location[:-1])}") + + def view_blob(self, branch: str, location=[]) -> str: + blob = self.get_blob(branch, location) + response = ( + f"{STATUS_SUCCESS} {META_GEMINI}\r\n" + + self.generate_header() + + f"## {self.name}/{'/'.join(location)} | {convert_filesize(blob.size)}\n\n" + ) + + if blob.is_binary: + response += ( + "This file seems to be binary. Open link below to download.\n" + f"=> {blob.name}?raw download" + ) + elif blob.size < MAX_DISPLAYED_BLOB_SIZE: + response += ( + f"=> {blob.name}?raw view raw\n\n" + "```\n" + blob.data.decode("utf-8") + "\n```" + ) + else: + response += ( + "This file is too large to be displayed. Open link below to download.\n" + f"=> {blob.name}?raw download\n\n" + ) + return response + + def view_raw_blob(self, branch: str, location=[]) -> bytes: + blob = self.get_blob(branch, location) + # if mimetypes can't make out the type, set it to plaintext + guessed_mimetype = mimetypes.guess_type(blob.name)[0] or META_PLAINTEXT + response = bytes(f"{STATUS_SUCCESS} {guessed_mimetype}\r\n", encoding="utf-8") + response += blob.data + return response