💾 Archived View for gmn.clttr.info › sources › geminispace.git › tree › gus › lib › search.py.txt captured on 2021-12-05 at 23:47:19.

View Raw

More Information

⬅️ Previous capture (2021-12-03)

➡️ Next capture (2022-04-28)

-=-=-=-=-=-=-

from urllib.parse import quote
import pathlib
import logging

import whoosh.qparser
import whoosh.highlight
from whoosh.analysis import FancyAnalyzer
from whoosh.fields import Schema, TEXT, DATETIME, STORED, ID, NUMERIC
from whoosh.filedb.filestore import FileStorage

from gus.lib.whoosh_extensions import UrlAnalyzer, GeminiFormatter, GeminiScorer


class Index:
    def __init__(self, index_dir, should_run_destructive=False):
        index_storage = FileStorage(index_dir)

        if should_run_destructive:
            pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
            self._index = self._create(index_storage)
        else:
            self._index = index_storage.open_index()

        self._searcher = self._index.searcher()

        self._query_parser = whoosh.qparser.MultifieldParser(
            ["content", "url", "prompt"],
            self._index.schema,
            group=whoosh.qparser.OrGroup.factory(0.99),
        )
        self._query_parser.add_plugin(whoosh.qparser.RegexPlugin())
        self._query_parser.add_plugin(whoosh.qparser.GtLtPlugin())
        self._query_parser.remove_plugin_class(whoosh.qparser.WildcardPlugin)
        self._query_parser.remove_plugin_class(whoosh.qparser.BoostPlugin)
        self._query_parser.remove_plugin_class(whoosh.qparser.RangePlugin)

        self._highlighter = whoosh.highlight.Highlighter(
            formatter=GeminiFormatter(),
            fragmenter=whoosh.highlight.ContextFragmenter(maxchars=160, surround=80),
            scorer=GeminiScorer(),
            order=whoosh.highlight.SCORE,
        )

        self._writer = None

    def _create(self, index_storage):
        schema = Schema(
            url_id=ID(unique=True, stored=True),
            url=TEXT(field_boost=2.0, stored=True, analyzer=UrlAnalyzer()),
            fetchable_url=STORED(),
            domain=TEXT(analyzer=UrlAnalyzer()),
            port=NUMERIC(int, 32, signed=False, stored=True),
            content_type=TEXT(stored=True),
            charset=ID(stored=True),
            lang=ID(stored=True),
            content=TEXT(analyzer=FancyAnalyzer(), spelling=True, stored=True),
            prompt=TEXT(analyzer=FancyAnalyzer(), stored=True),
            size=NUMERIC(
                int,
                # this means GUS will have problems indexing responses over ~2GB
                32,
                signed=False,
                stored=True,
            ),
            backlink_count=NUMERIC(
                int, 16, signed=False, stored=True,  # num bits, so max value is 65k
            ),
            indexed_at=DATETIME(stored=True),
        )
        return index_storage.create_index(schema)

    def close(self):
        if self._writer:
            self._writer.commit()
        self._index.close()

    def _rolling_writer(self):
        if not self._writer:
            self._writer = self._index.writer()

        return self._writer

    def add_document(self, document):
        self._rolling_writer().update_document(**document)

    def delete_by_term(self, key, val):  # TODO delete_document
        self._rolling_writer().delete_by_term(key, val, searcher=None)

    def parse_query(self, query):
        return self._query_parser.parse(query)

    def highlight(self, result):
        if "content" in result:
            return self._highlighter.highlight_hit(result, "content", top=1)
        else:
            return ""

    def search(self, query, pagenr, pagelen=10):
        return self._searcher.search_page(query, pagenr, pagelen)

    def suggestions(self, query):
        suggestions = []
        corrector = self._searcher.corrector("content")
        for query_part in query.split(" "):
            query_part_suggestions = corrector.suggest(query_part, limit=3)
            suggestions.extend(
                {"raw": suggestion, "quoted": quote(suggestion)}
                for suggestion in query_part_suggestions
            )
        return suggestions