💾 Archived View for gmn.clttr.info › sources › geminispace.info.git › tree › gus › lib › search.py.t… captured on 2023-01-29 at 05:07:36.

View Raw

More Information

➡️ Next capture (2023-06-14)

-=-=-=-=-=-=-

from urllib.parse import quote
import pathlib
import logging

import whoosh.qparser
import whoosh.highlight
from whoosh.analysis import FancyAnalyzer
from whoosh.fields import Schema, TEXT, DATETIME, STORED, ID, NUMERIC
from whoosh.filedb.filestore import FileStorage

from gus.lib.whoosh_extensions import UrlAnalyzer, GeminiFormatter, GeminiScorer


class Index:
    def __init__(self, index_dir, should_run_destructive=False):
        index_storage = FileStorage(index_dir, supports_mmap=False)
        self._destructive = should_run_destructive

        if self._destructive:
            pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
            self._index = self._create(index_storage)
        else:
            self._index = index_storage.open_index()

        self._searcher = self._index.searcher()

        self._query_parser = whoosh.qparser.MultifieldParser(
            ["content", "url", "prompt"],
            self._index.schema,
            group=whoosh.qparser.OrGroup.factory(0.99),
        )
        self._query_parser.add_plugin(whoosh.qparser.RegexPlugin())
        self._query_parser.add_plugin(whoosh.qparser.GtLtPlugin())
        self._query_parser.remove_plugin_class(whoosh.qparser.WildcardPlugin)
        self._query_parser.remove_plugin_class(whoosh.qparser.BoostPlugin)
        self._query_parser.remove_plugin_class(whoosh.qparser.RangePlugin)

        self._highlighter = whoosh.highlight.Highlighter(
            formatter=GeminiFormatter(),
            fragmenter=whoosh.highlight.ContextFragmenter(maxchars=160, surround=80),
            scorer=GeminiScorer(),
            order=whoosh.highlight.SCORE,
        )

        self._writer = None

    def _create(self, index_storage):
        schema = Schema(
            url_id=ID(unique=True, stored=True),
            url=TEXT(field_boost=2.0, stored=True, analyzer=UrlAnalyzer()),
            fetchable_url=STORED(),
            domain=TEXT(analyzer=UrlAnalyzer()),
            port=NUMERIC(int, 32, signed=False, stored=True),
            content_type=TEXT(stored=True),
            charset=ID(stored=True),
            lang=ID(stored=True),
            content=TEXT(analyzer=FancyAnalyzer(), spelling=True, stored=True),
            prompt=TEXT(analyzer=FancyAnalyzer(), stored=True),
            size=NUMERIC(
                int,
                # this means GUS will have problems indexing responses over ~2GB
                32,
                signed=False,
                stored=True,
            ),
            backlink_count=NUMERIC(
                int, 16, signed=False, stored=True,  # num bits, so max value is 65k
            ),
            indexed_at=DATETIME(stored=True),
        )
        return index_storage.create_index(schema)

    def close(self):
        if self._writer:
            self._writer.commit()
        self._index.close()

    def _rolling_writer(self):
        if not self._writer:
            self._writer = self._index.writer(limitmb=1024, procs=3, multisegment=self._destructive)
        return self._writer

    def add_document(self, document):
        self._rolling_writer().update_document(**document)

    def delete_by_term(self, key, val):
        self._rolling_writer().delete_by_term(key, val, searcher=None)

    def parse_query(self, query):
        return self._query_parser.parse(query)

    def highlight(self, result):
        if "content" in result:
            return self._highlighter.highlight_hit(result, "content", top=1)
        else:
            return ""

    def search(self, query, pagenr, pagelen=10):
        return self._searcher.search_page(query, pagenr, pagelen)

    def suggestions(self, query):
        suggestions = []
        corrector = self._searcher.corrector("content")
        for query_part in query.split(" "):
            query_part_suggestions = corrector.suggest(query_part, limit=3)
            suggestions.extend(
                {"raw": suggestion, "quoted": quote(suggestion)}
                for suggestion in query_part_suggestions
            )
        return suggestions