💾 Archived View for any-key.press › vostok › atom2gemfeed.py captured on 2024-09-29 at 01:23:31.

View Raw

More Information

-=-=-=-=-=-=-

#!/usr/bin/env python3
"""Generate Gemini feed by Atom.

=> gemini://geminiprotocol.net/docs/companion/subscription.gmi Documentation
=> gemini://any-key.press/vgi/atom2gemfeed/?gemini%3A%2F%2Fany-key.press%2Fatom.xml Instance
"""
import xml.etree.ElementTree as ET
from datetime import datetime
from email.message import Message
from socket import create_connection
from ssl import SSLContext, CERT_NONE, PROTOCOL_TLS_CLIENT
from urllib.parse import urlsplit, urljoin, uses_relative, uses_netloc, unquote


# for urljoin:
uses_relative.append("gemini")
uses_netloc.append("gemini")


def _main(raw_url):
    splitted_url = urlsplit(raw_url)
    if splitted_url.scheme != "gemini":
        print("59 Only Gemini links are allowed\r")
        return

    for _ in range(6):
        with create_connection((splitted_url.hostname, splitted_url.port or 1965)) as raw_conn:
            context = SSLContext(PROTOCOL_TLS_CLIENT)
            context.check_hostname = False
            context.verify_mode = CERT_NONE
            with context.wrap_socket(raw_conn, server_hostname=splitted_url.hostname) as conn:
                conn.sendall((raw_url + '\r\n').encode("UTF-8"))
                fp = conn.makefile("rb")

                splitted = fp.readline().decode("UTF-8").strip().split(maxsplit=1)
                status = splitted[0]
                if status.startswith("3") and len(splitted) == 2:
                    # redirect
                    raw_url = urljoin(raw_url, splitted[1])
                    continue

                if not status.startswith("2"):
                    print(f"43 Remote server error: {' '.join(splitted)}\r")
                    return

                mime = splitted[1].lower() if len(splitted) == 2 else "text/gemini"
                mime_matched = (
                    mime.startswith("text/")
                    or mime.startswith("application/xml")
                    or mime.startswith("application/atom")
                )
                if not mime_matched:
                    print(f"43 Only links to `text/*` are allowed: {mime}\r")
                    return

                # gemini://geminiprotocol.net/docs/companion/subscription.gmi

                m = Message()
                m['content-type'] = mime
                root = {}
                try:
                    for child in ET.fromstring(fp.read().decode(m.get_param('charset') or "UTF-8")):
                        _, _, tag = child.tag.rpartition('}')
                        root.setdefault(tag, []).append(child)
                except ET.ParseError:
                    print("43 Parse Atom error\r")
                    return

                print("20 text/gemini\r")
                title = root.get("title")
                print(f"# {title[0].text if title else raw_url}\r")
                print("\r")
                for entry in root.get("entry") or []:
                    entry_dict = {}
                    for child in entry:
                        _, _, tag = child.tag.rpartition('}')
                        entry_dict[tag] = child.text

                    gemini_link = entry_dict.get("id")
                    if not gemini_link:
                        continue

                    entry_date = ""
                    updated = entry_dict.get("updated")
                    if updated:
                        try:
                            entry_date = (
                                datetime.fromisoformat(updated)
                                .date().strftime("%Y-%m-%d - ")
                            )
                        except ValueError:
                            pass

                    title = entry_dict.get("title") or ""
                    print(f"=> {gemini_link} {entry_date}{entry_dict.get('title') or ''}\r")
                return


if __name__ == '__main__':
    _main(unquote(urlsplit(input().strip()).query))