import re from datetime import datetime from . import constants from gus.lib.db_model import init_db, Page from gus.lib.gemini import GeminiResource from gus.lib.index_statistics import ( compute_index_statistics, load_all_statistics_from_file, ) from gus.lib.misc import bytes2human import gus.lib.search as search TEXT_CONTENT_TYPE = ["text/plain", "text/gemini", "text/markdown"] class GUS: def __init__(self): self.index = search.Index(constants.INDEX_DIR) self.db = init_db(f"{constants.INDEX_DIR}/{constants.DB_FILENAME}") self.statistics = compute_index_statistics(self.db) self.statistics_historical_overall = load_all_statistics_from_file( constants.STATISTICS_FILE) hosts_query = Page.raw( """ SELECT DISTINCT p.domain FROM page AS p WHERE last_success_status = 20 ORDER BY p.domain """ ) self.hosts = hosts_query.execute() newest_hosts_query = Page.raw( """ SELECT p.domain, p.first_seen_at FROM page AS p WHERE last_success_status = 20 AND first_seen_at IS NOT NULL GROUP BY p.domain ORDER BY first_seen_at DESC LIMIT 50 """ ) self.newest_hosts = newest_hosts_query.execute() newest_pages_query = Page.raw( """SELECT p.url, p.first_seen_at FROM page as p WHERE last_success_status = 20 AND first_seen_at IS NOT NULL ORDER BY first_seen_at DESC LIMIT 50""") self.newest_pages = newest_pages_query.execute() feeds_query = Page.raw( """SELECT DISTINCT p.url FROM page AS p WHERE last_success_status = 20 AND (p.url LIKE '%atom.xml' OR p.url LIKE '%feed.xml' OR p.url LIKE '%rss.xml' OR p.url LIKE '%.rss' OR p.url LIKE '%.atom' OR p.url LIKE '%twtxt.txt' OR p.content_type IN ('application/atom+xml', 'application/rss+xml')) ORDER BY p.url """) self.feeds = feeds_query.execute() def search_index(self, query, requested_page): query = self.index.parse_query(query) results = self.index.search(query, requested_page, pagelen=10) return ( len(results), [ { "score": result.score, "indexed_at": result["indexed_at"], "url": result["url"], "content_type": result["content_type"], "charset": result["charset"] if "charset" in result else "none", "size": result["size"] if "size" in result else 0, "prompt": result["prompt"] if "prompt" in result else "", "highlights": self.index.highlight(result) if result["content_type"] in TEXT_CONTENT_TYPE else "", "link_text": GUS._get_link_text(result), "backlink_count": result["backlink_count"], } for result in results ], ) def get_backlinks(self, url): resource = GeminiResource(url) if not resource.is_valid: return [], [] u = resource.normalized_url.rstrip("/") backlinks_query = Page.raw( """SELECT p_from.url, l.is_cross_host_like FROM page AS p_from JOIN link as l ON l.from_page_id == p_from.id JOIN page as p_to ON p_to.id == l.to_page_id WHERE p_to.url IN (?, ?) AND p_from.url != ? GROUP BY p_from.url ORDER BY l.is_cross_host_like, p_from.url ASC""", u, f"{u}/", resource.normalized_url, ) backlinks = backlinks_query.execute() internal_backlink_urls = [b.url for b in backlinks if not b.is_cross_host_like] external_backlink_urls = [b.url for b in backlinks if b.is_cross_host_like] return internal_backlink_urls, external_backlink_urls def _get_link_text(result): if result["content_type"] == "input": prompt_suffix = ": {}".format(result["prompt"]) link_text = "{} ({}{})".format( result["url"][9:], result["content_type"], prompt_suffix ) else: link_text = "{} ({}, {})".format( result["url"][9:], result["content_type"], bytes2human(result["size"], format="%(value).0f%(symbol)s"), ) return link_text def get_search_suggestions(self, query): return self.index.suggestions(query) def compute_requested_results_page(request_path): page = 1 p = re.compile("^(/v)?/search(/\d+)?/?") m = p.match(request_path) if m.group(2) is not None: page = int(m.group(2)[1:]) return max(page, 1) def compute_verbose(request_path): verbose = False p = re.compile("^(/v)?/search(/\d+)?/?") m = p.match(request_path) if m.group(1) is not None: verbose = True return verbose def process_seed_request(seed_request): with open(constants.SEED_REQUEST_FILE, "a") as seed_file: if seed_request.startswith("Gemini://"): seed_request = seed_request.replace('G', 'g', 1) if not seed_request.startswith("gemini://"): seed_request = "gemini://{}".format(seed_request) seed_file.write("{}\n".format(seed_request))