💾 Archived View for gmn.clttr.info › sources › geminispace.info.git › tree › gus › lib › gemini.py.t… captured on 2024-02-05 at 10:08:43.

View Raw

More Information

⬅️ Previous capture (2023-12-28)

-=-=-=-=-=-=-

import re
from urllib.parse import (
    quote,
    unquote,
    urljoin,
    urlparse,
    urlsplit,
    urlunparse,
    urlunsplit,
    uses_relative,
    uses_netloc,
)
from urllib.robotparser import RobotFileParser

import gusmobile
from gus import constants
from gus.lib.domain import is_domain

# hack: the built-in methods in urllib need to know the
# Gemini protocol exists
uses_relative.append("gemini")
uses_netloc.append("gemini")

LOG_ROOT_LIKE_PATTERN = re.compile(
    r".*/(gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/?$",
    flags=re.IGNORECASE,
)
LOG_POST_LIKE_PATTERN = re.compile(
    r".*/((gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/.+$|.*/\d{4}[-_]\d{2}[-_]\d{2}.*$|.*/(19|20)\d{6}.*$)",
    flags=re.IGNORECASE,
)
LOG_POST_LIKE_EXCLUSION_PATTERN = re.compile(
    r".*/(games|archive|archives|rss|handlers|diagnostics)/.*|.*atom.xml$|.*gemlog.gmi$|.*index.gmi$|.*index.gemini$",
    flags=re.IGNORECASE,
)
LOG_POST_GEMLOGBLUE_LIKE_PATTERN = re.compile(
    r"^/users/[a-z][-a-z0-9]*/\d+\.gmi?", flags=re.IGNORECASE
)
LOG_POST_BOSTON_LIKE_PATTERN = re.compile(
    r"^/boston/\d{4}/\d{2}/\d+\.\d+", flags=re.IGNORECASE
)

ROOT_LIKE_ONLY_PATTERN = re.compile(
    r"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?$", flags=re.IGNORECASE
)
ROOT_LIKE_PATTERN = re.compile(
    r"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?", flags=re.IGNORECASE
)

AUTHOR_URL_PATTERN = re.compile(
    r"^/~([a-z][-a-z0-9]*)/|^/users/~?([a-z][-a-z0-9]*)", flags=re.IGNORECASE
)
AUTHOR_CONTENT_PATTERN = re.compile(
    r".*(by|author): ([\w\s\d]+)", flags=re.IGNORECASE | re.MULTILINE
)

TITLE_CONTENT_PATTERN = re.compile(r"^#\s(.*)$", flags=re.IGNORECASE | re.MULTILINE)
TITLE_URL_PATTERN = re.compile(
    r".*/(\d{8}[-_]|\d{4}[-_]\d{2}[-_]\d{2}[-_])?([a-z0-9-_]+)(\.[a-z0-9]+)$",
    flags=re.IGNORECASE,
)


class GeminiRobotFileParser(RobotFileParser):
    def set_url(self, url):
        """Sets the URL referring to a robots.txt file."""
        self.url = url
        u, _ = GeminiResource.urlsplit_featureful(url)
        self.host, self.path = u[1:3]

    def read(self):
        """Reads the robots.txt URL and feeds it to the parser."""
        gr = GeminiResource(self.url)
        response = gr.fetch()
        if response is None:
            self.allow_all = True
            return
        if not response.status.startswith("2") or not response.content_type.startswith("text/"):
            self.allow_all = True
        else:
            self.parse(response.content.splitlines())

    def read_from_string(self, robots_txt):
        """An utility method for writing tests"""
        self.parse(robots_txt.splitlines())

    def can_fetch_prioritized(self, useragents, url):
        """Given a url and prioritized list of user-agents, is fetching allowed?

        Priority is with the highest priority first; eg. ["ThisIndexerBot", "generic-indexer", "generic-bot", "*"].
        """
        if self.allow_all:
            return True
        if self.disallow_all:
            return False

        if not self.last_checked:
            return False

        parsed_url = urlparse(unquote(url))
        url = urlunparse(('','',parsed_url.path, parsed_url.params,parsed_url.query, parsed_url.fragment))
        url = quote(url) or "/"

        def useragent_allowed(useragent):
            for entry in self.entries:
                if entry.applies_to(useragent):
                    return entry.allowance(url)
            return None

        # map user-agents to allowances; the first non-None will be the prioritized allowance
        for ua in useragents:
            allowed = useragent_allowed(ua)
            if allowed is not None:
                return allowed

        # if none of the user-agents match, check default entry
        if self.default_entry:
            return self.default_entry.allowance(url)

        # if nothing matches, crawling is allowed
        return True

class GeminiResource:
    def __init__(self, url, fully_qualified_parent_url=None, parent_hostname=None):
        self.raw_url = url
        self.urlsplit, self.is_relative = GeminiResource.urlsplit_featureful(
            url,
            fully_qualified_parent_url=fully_qualified_parent_url,
            parent_hostname=parent_hostname,
        )
        self.is_valid = self.urlsplit is not None
        self.fully_qualified_parent_url = fully_qualified_parent_url
        self._normalized_host = None
        self._normalized_host_like = None
        self._fetchable_url = None
        self._is_root_like = None
        self._is_log_root_like = None
        self._is_log_post_like = None
        self._default_change_frequency = None
        self.contained_resources = None

    def urlsplit_featureful(url, fully_qualified_parent_url=None, parent_hostname=None):
        # the point of this relatively complex function is to allow for protocol-less,
        # double-slash-prepended-less URLs that still get treated as absolute (i.e.,
        # non-relative) URLs and thus get their hosts parsed correctly by `urlsplit`.
        # This is important because I want to be able to use the host for a number of
        # things behind the scenes.

        is_relative = False
        u = urlsplit(url, "gemini")
        if u.scheme != "gemini":
            return None, None
        if u.hostname is None:
            if url.startswith("/"):
                # process relative link
                if parent_hostname is None:
                    return None, None
                joined = urljoin("gemini://{}".format(parent_hostname), url)
                u = urlsplit(joined, "gemini")
                is_relative = True
            else:  # url does not start with /
                # could be: blah.com/test
                # could be: test
                url_split = url.split("/")
                if is_domain(url_split[0]):
                    # treat schemeless uris as non-gemini as announced in
                    # https://lists.orbitalfox.eu/archives/gemini/2020/003646.html
                    return None, None
                else:
                    # process relative link
                    if fully_qualified_parent_url is None:
                        return None, None
                    joined = urljoin(fully_qualified_parent_url, url)
                    u = urlsplit(joined, "gemini")
                    is_relative = True
        return u, is_relative

    def _get_normalized_host(self):
        if not self.is_valid:
            return None
        if self._normalized_host is None:
            self._normalized_host = self.urlsplit.hostname
        return self._normalized_host

    def _get_normalized_host_like(self):
        if not self.is_valid:
            return None
        if self._normalized_host_like is None:
            normalized_host_like = self.normalized_host
            m = ROOT_LIKE_PATTERN.match(self.urlsplit.path)
            if m:
                normalized_host_like += m[0].rstrip("/")
            self._normalized_host_like = normalized_host_like
        return self._normalized_host_like

    def _get_fetchable_url(self):
        if not self.is_valid:
            return None
        if self._fetchable_url is None:
            # we deliberately do not work with the fragment part
            self._fetchable_url = "{}://{}{}{}{}".format(
                self.urlsplit.scheme, 
                self.urlsplit.hostname, 
                "" if self.urlsplit.port is None or self.urlsplit.port == 1965 else ":{}".format(self.urlsplit.port),
                "/" if self.urlsplit.path == "" else self.urlsplit.path,
                "" if self.urlsplit.query == "" else "?{}".format(self.urlsplit.query))
        return self._fetchable_url

    def _get_is_root_like(self):
        if self._is_root_like is None:
            is_root_like = False
            if (
                self.urlsplit.path == ""
                or self.urlsplit.path == "/"
                or ROOT_LIKE_ONLY_PATTERN.match(self.urlsplit.path)
            ):
                is_root_like = True
            self._is_root_like = is_root_like
        return self._is_root_like

    def _get_is_log_root_like(self):
        if self._is_log_root_like is None:
            is_log_root_like = False
            if (
                self.urlsplit.path == ""
                or self.urlsplit.path == "/"
                or LOG_ROOT_LIKE_PATTERN.match(self.urlsplit.path)
            ):
                is_log_root_like = True
            self._is_log_root_like = is_log_root_like
        return self._is_log_root_like

    def _get_is_log_post_like(self):
        if self._is_log_post_like is None:
            is_log_post_like = False
            post_like_match = LOG_POST_LIKE_PATTERN.match(self.urlsplit.path)
            post_like_exclusion_match = LOG_POST_LIKE_EXCLUSION_PATTERN.match(
                self.urlsplit.path
            )
            post_gemlogblue_match = LOG_POST_GEMLOGBLUE_LIKE_PATTERN.match(
                self.urlsplit.path
            )
            post_boston_match = LOG_POST_BOSTON_LIKE_PATTERN.match(self.urlsplit.path)

            if (
                (post_like_match and not post_like_exclusion_match)
                or (self.normalized_host == "gemlog.blue" and post_gemlogblue_match)
                or (self.normalized_host == "gemini.conman.org" and post_boston_match)
            ):
                is_log_post_like = True
            self._is_log_post_like = is_log_post_like
        return self._is_log_post_like

    def get_default_change_frequency(self, category):
        if not self.is_valid:
            return None
        if self._default_change_frequency is None:
            if category == "content":
                if self.is_root_like or self.is_log_root_like:
                    change_frequency = constants.ROOT_CHANGE_FREQUENCY_DEFAULT
                else:
                    change_frequency = constants.NON_ROOT_CHANGE_FREQUENCY_DEFAULT
            elif category == "binary":
                change_frequency = constants.BINARY_CHANGE_FREQUENCY_DEFAULT
            elif category == "redirect":
                change_frequency = constants.REDIRECT_CHANGE_FREQUENCY_DEFAULT
            elif category == "temp_error":
                change_frequency = constants.TEMP_ERROR_CHANGE_FREQUENCY_DEFAULT
            elif category == "perm_error":
                change_frequency = constants.PERM_ERROR_CHANGE_FREQUENCY_DEFAULT
            elif category == "prompt":
                change_frequency = constants.PROMPT_CHANGE_FREQUENCY_DEFAULT
            else:
                raise Exception.NameError("Unrecognized resource category")

            self._default_change_frequency = change_frequency
        return self._default_change_frequency

    def increment_change_frequency(self, existing_change_frequency, category):
        if category == "content":
            if self.is_root_like or self.is_log_root_like:
                return existing_change_frequency + constants.ROOT_CHANGE_FREQUENCY_INCREMENT
            else:
                return existing_change_frequency + constants.NON_ROOT_CHANGE_FREQUENCY_INCREMENT
        elif category == "binary":
            return existing_change_frequency + constants.BINARY_CHANGE_FREQUENCY_INCREMENT
        elif category == "redirect":
            return existing_change_frequency + constants.REDIRECT_CHANGE_FREQUENCY_INCREMENT
        elif category == "temp_error":
            return existing_change_frequency + constants.TEMP_ERROR_CHANGE_FREQUENCY_INCREMENT
        elif category == "perm_error":
            return existing_change_frequency + constants.PERM_ERROR_CHANGE_FREQUENCY_INCREMENT
        elif category == "prompt":
            return existing_change_frequency + constants.PROMPT_CHANGE_FREQUENCY_INCREMENT
        else:
            raise Exception.NameError("Unrecognized resource category")


    def fetch(self):
        # NB: this intentionally does NOT fetch the normalized URL, because that could
        # cause an infinite loop with, e.g., normalization stripping a trailing slash
        # and a server redirecting to the same URL _with_ a trailing slash.
        return gusmobile.fetch(self.fetchable_url)


    def extract_contained_resources(self, content):
        # this finds all gemini URLs within the content of a given GeminiResource and
        # returns them as a list of new GeminiResources
        if self.contained_resources:
            return self.contained_resources

        link_pattern = r"^=>\s*(\S+)"
        preformat_pattern = r"^```.*?^```"
        content_without_preformat = re.sub(
            preformat_pattern, "", content, flags=re.DOTALL | re.MULTILINE
        )
        probable_urls = re.findall(
            link_pattern, content_without_preformat, re.MULTILINE
        )
        resources = []
        for url in probable_urls:
            resource = GeminiResource(
                url,
                fully_qualified_parent_url=self.fetchable_url,
                parent_hostname=self.urlsplit.hostname.lower(),
            )
            if resource.is_valid:
                resources.append(resource)
        self.contained_resources = resources

        return self.contained_resources

    # constructed from fetchable_url
    # does not matter if quoted or unquoted so I choose arbitrarily to
    # standardize on unquoting it.
    normalized_host = property(_get_normalized_host)
    # constructed from urlsplit, should be quoted.
    fetchable_url = property(_get_fetchable_url)
    # constructed from fetchable_url, should be unquoted.
    is_root_like = property(_get_is_root_like)
    is_log_root_like = property(_get_is_log_root_like)
    is_log_post_like = property(_get_is_log_post_like)
    # pubnix-aware host version, means that the user-specific dir is appended
    normalized_host_like = property(_get_normalized_host_like)