💾 Archived View for gmn.clttr.info › sources › geminispace.info.git › tree › serve › models.py.txt captured on 2023-06-14 at 14:31:33.

View Raw

More Information

⬅️ Previous capture (2023-04-20)

➡️ Next capture (2023-09-08)

🚧 View Differences

-=-=-=-=-=-=-

import re
from datetime import datetime

from . import constants
from gus.lib.db_model import init_db, Page
from gus.lib.gemini import GeminiResource
from gus.lib.index_statistics import (
    compute_index_statistics,
    load_all_statistics_from_file,
)
from gus.lib.misc import bytes2human
import gus.lib.search as search

TEXT_CONTENT_TYPE = ["text/plain", "text/gemini", "text/markdown"]


class GUS:
    def __init__(self):
        self.index = search.Index(constants.INDEX_DIR)
        self.db = init_db(f"{constants.INDEX_DIR}/{constants.DB_FILENAME}")
        self.statistics = compute_index_statistics(self.db)
        self.statistics_historical_overall = load_all_statistics_from_file(
            constants.STATISTICS_FILE)
        hosts_query = Page.raw(
            """
            SELECT DISTINCT p.domain
            FROM page AS p
            WHERE last_success_status = 20
            ORDER BY p.domain
            """
        )
        self.hosts = hosts_query.execute()
        
        newest_hosts_query = Page.raw(
            """
            SELECT p.domain, p.first_seen_at
            FROM page AS p
            WHERE last_success_status = 20
            AND first_seen_at IS NOT NULL
            GROUP BY p.domain
            ORDER BY first_seen_at DESC
            LIMIT 50
            """
        )
        self.newest_hosts = newest_hosts_query.execute()

        newest_pages_query = Page.raw(
            """SELECT p.url, p.first_seen_at FROM page as p
            WHERE last_success_status = 20
            AND first_seen_at IS NOT NULL
            ORDER BY first_seen_at DESC
            LIMIT 50""")
        self.newest_pages = newest_pages_query.execute()
        
        feeds_query = Page.raw(
            """SELECT DISTINCT p.url
            FROM page AS p
            WHERE last_success_status = 20
            AND (p.url LIKE '%atom.xml'
            OR p.url LIKE '%feed.xml'
            OR p.url LIKE '%rss.xml'
            OR p.url LIKE '%.rss'
            OR p.url LIKE '%.atom'
            OR p.url LIKE '%twtxt.txt'
            OR p.content_type IN ('application/atom+xml', 'application/rss+xml'))
            ORDER BY p.url
            """)
        self.feeds = feeds_query.execute()
    

    def search_index(self, query, requested_page):
        query = self.index.parse_query(query)
        results = self.index.search(query, requested_page, pagelen=10)
        return (
            len(results),
            [
                {
                    "score": result.score,
                    "indexed_at": result["indexed_at"],
                    "url": result["url"],
                    "content_type": result["content_type"],
                    "charset": result["charset"] if "charset" in result else "none",
                    "size": result["size"] if "size" in result else 0,
                    "prompt": result["prompt"] if "prompt" in result else "",
                    "highlights": self.index.highlight(result) if result["content_type"] in TEXT_CONTENT_TYPE else "",
                    "link_text": GUS._get_link_text(result),
                    "backlink_count": result["backlink_count"],
                }
                for result in results
            ],
        )

    def get_backlinks(self, url):
        resource = GeminiResource(url)
        if not resource.is_valid:
            return [], []

        u = resource.normalized_url.rstrip("/")
        backlinks_query = Page.raw(
            """SELECT p_from.url, l.is_cross_host_like
            FROM page AS p_from
            JOIN link as l ON l.from_page_id == p_from.id
            JOIN page as p_to ON p_to.id == l.to_page_id
            WHERE p_to.url IN (?, ?)
            AND p_from.url != ?
            GROUP BY p_from.url
            ORDER BY l.is_cross_host_like, p_from.url ASC""",
            u,
            f"{u}/",
            resource.normalized_url,
        )
        backlinks = backlinks_query.execute()

        internal_backlink_urls = [b.url for b in backlinks if not b.is_cross_host_like]
        external_backlink_urls = [b.url for b in backlinks if b.is_cross_host_like]
        return internal_backlink_urls, external_backlink_urls

    def _get_link_text(result):
        if result["content_type"] == "input":
            prompt_suffix = ": {}".format(result["prompt"])
            link_text = "{} ({}{})".format(
                result["url"][9:], result["content_type"], prompt_suffix
            )
        else:
            link_text = "{} ({}, {})".format(
                result["url"][9:],
                result["content_type"],
                bytes2human(result["size"], format="%(value).0f%(symbol)s"),
            )
        return link_text

    def get_search_suggestions(self, query):
        return self.index.suggestions(query)


def compute_requested_results_page(request_path):
    page = 1
    p = re.compile("^(/v)?/search(/\d+)?/?")
    m = p.match(request_path)
    if m.group(2) is not None:
        page = int(m.group(2)[1:])
    return max(page, 1)


def compute_verbose(request_path):
    verbose = False
    p = re.compile("^(/v)?/search(/\d+)?/?")
    m = p.match(request_path)
    if m.group(1) is not None:
        verbose = True
    return verbose


def process_seed_request(seed_request):
    with open(constants.SEED_REQUEST_FILE, "a") as seed_file:
        if seed_request.startswith("Gemini://"):
            seed_request = seed_request.replace('G', 'g', 1)
        if not seed_request.startswith("gemini://"):
            seed_request = "gemini://{}".format(seed_request)
        seed_file.write("{}\n".format(seed_request))