💾 Archived View for gmn.clttr.info › sources › geminispace.info.git › tree › gus › crawl.py.txt captured on 2023-06-14 at 14:31:21.

View Raw

More Information

⬅️ Previous capture (2023-04-20)

➡️ Next capture (2023-09-08)

🚧 View Differences

-=-=-=-=-=-=-

import argparse
import random
import logging
import re
from concurrent.futures import ThreadPoolExecutor

from datetime import datetime, timedelta
import os
import pathlib
import time
from urllib.parse import urljoin, uses_relative, uses_netloc

import peewee

from gus.excludes import EXCLUDED_URL_PREFIXES, EXCLUDED_URL_PATHS
from . import constants
from gus.lib.db_model import init_db, Page, PageContent, Link
from gus.lib.gemini import GeminiResource, GeminiRobotFileParser
import gus.lib.logging
from gus.lib.logging import strip_control_chars

# hack: the built-in methods in urllib need to know the
# Gemini protocol exists
uses_relative.append("gemini")
uses_netloc.append("gemini")

CRAWL_DELAYS = {
    "alexschroeder.ch": 5000,
    "communitywiki.org": 5000,
}

EXCLUDED_URL_PATTERN = re.compile(
    r"^gemini://(\d{6}\.ch|almp\d{4}\.app|.*/_(revert|history)/).*",
    flags=re.IGNORECASE
)

def index_binary(resource, response):
    logging.debug(
        "Indexing binary for: %s",
        strip_control_chars(resource.fetchable_url),
    )

    doc = {
        "url": resource.fetchable_url,
        "domain": resource.normalized_host,
        "port": resource.urlsplit.port or 1965,
        "content_type": response.content_type,
        "charset": response.charset,
        "size": response.num_bytes,
        "change_frequency": resource.get_default_change_frequency("binary"),
        "last_crawl_at": datetime.utcnow(),
        "last_crawl_success_at": datetime.utcnow(),
        "last_status" : response.status,
        "last_success_status": response.status,
        "last_status_message" : response.error_message,
        "first_seen_at" : datetime.utcnow()
    }
    existing_page = Page.get_or_none(url=resource.fetchable_url)
    if existing_page:
        doc["id"] = existing_page.id
        if not (existing_page.first_seen_at is None):
            doc["first_seen_at"] = existing_page.first_seen_at 
        existing_change_frequency = (
            existing_page.change_frequency
            or resource.get_default_change_frequency("binary")
        )
        doc["change_frequency"] = resource.increment_change_frequency(
            existing_change_frequency, "binary"
        )

    page = Page(**doc)
    try:
        page.save()
    except:
        logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))

    return page


def index_redirect(resource, response):
    logging.debug(
        "Indexing redirect for: %s",
        strip_control_chars(resource.fetchable_url),
    )

    doc = {
        "url": resource.fetchable_url,
        "domain": resource.normalized_host,
        "port": resource.urlsplit.port or 1965,
        "change_frequency": resource.get_default_change_frequency("redirect"),
        "last_crawl_at": datetime.utcnow(),
        "last_crawl_success_at": datetime.utcnow(),
        "last_status" : response.status,
        "last_success_status" : response.status,
        "last_status_message" : response.error_message,
        "first_seen_at" : datetime.utcnow()
    }
    existing_page = Page.get_or_none(url=resource.fetchable_url)
    if existing_page:
        doc["id"] = existing_page.id
        if not (existing_page.first_seen_at is None):
            doc["first_seen_at"] = existing_page.first_seen_at 
        existing_change_frequency = (
            existing_page.change_frequency
            or resource.get_default_change_frequency("redirect")
        )
        doc["change_frequency"] = resource.increment_change_frequency(
            existing_change_frequency, "redirect"
        )

    page = Page(**doc)
    try:
        page.save()
    except:
        logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))
    
    return page


def index_error(resource, is_temporary, response):
    category = "temp_error" if is_temporary else "perm_error"
    default_change_frequency = resource.get_default_change_frequency(category)
    doc = {
        "url": resource.fetchable_url,
        "domain": resource.normalized_host,
        "port": resource.urlsplit.port or 1965,
        "change_frequency": default_change_frequency,
        "last_crawl_at": datetime.utcnow(),
        "last_status" : None if response is None else response.status,
        "last_status_message" : None if response is None else response.error_message
    }
    existing_page = Page.get_or_none(url=resource.fetchable_url)
    if existing_page:
        doc["id"] = existing_page.id
        existing_change_frequency = (
            existing_page.change_frequency or default_change_frequency
        )
        doc["change_frequency"] = resource.increment_change_frequency(
            existing_change_frequency, category
        )
    page = Page(**doc)
    try:
        page.save()
    except:
        logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url))
    
    return page


def index_prompt(resource, response):
    logging.debug(
        "Indexing prompt for: %s",
        strip_control_chars(resource.fetchable_url),
    )

    doc = {
        "url": resource.fetchable_url,
        "domain": resource.normalized_host,
        "port": resource.urlsplit.port or 1965,
        "content_type": "input",
        "charset": response.charset,
        "size": response.num_bytes,
        "change_frequency": resource.get_default_change_frequency("prompt"),
        "last_crawl_at": datetime.utcnow(),
        "last_crawl_success_at": datetime.utcnow(),
        "last_status" : response.status,
        "last_success_status" : response.status,
        "last_status_message" : response.error_message,
        "first_seen_at" : datetime.utcnow()
    }
    existing_page = Page.get_or_none(url=resource.fetchable_url)
    if existing_page:
        doc["id"] = existing_page.id
        if not (existing_page.first_seen_at is None):
            doc["first_seen_at"] = existing_page.first_seen_at 
        existing_change_frequency = (
            existing_page.change_frequency
            or resource.get_default_change_frequency("prompt")
        )
        doc["change_frequency"] = resource.increment_change_frequency(
            existing_change_frequency, "prompt"
        )
    
    page = Page(**doc)
    try:
        page.save()
        content = {
            "page_id": page.id,
            "prompt": response.prompt,
            "content": None
        }
        existing_pagecontent = PageContent.get_or_none(page_id=page.id)
        if existing_pagecontent:
            content["id"] = existing_pagecontent.id

        pagecontent = PageContent(**content)
        pagecontent.save()
    except:
        logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) 
    
    return page


def index_content(resource, response):
    logging.debug(
        "Storing content for: %s",
        strip_control_chars(resource.fetchable_url),
    )

    doc = {
        "url": resource.fetchable_url,
        "domain": resource.normalized_host,
        "port": resource.urlsplit.port or 1965,
        "content_type": response.content_type,
        "charset": response.charset,
        "size": response.num_bytes,
        "change_frequency": resource.get_default_change_frequency("content"),
        "last_crawl_at": datetime.utcnow(),
        "last_crawl_success_at": datetime.utcnow(),
        "last_status" : response.status,
        "last_success_status" : response.status,
        "last_status_message" : response.error_message,
        "first_seen_at" : datetime.utcnow()
    }
    if response.content_type == "text/gemini":
        doc["lang"] = (response.lang or "none",)
    existing_page = Page.get_or_none(url=resource.fetchable_url)
    is_different = False
    if existing_page:
        doc["id"] = existing_page.id
        if not (existing_page.first_seen_at is None):
            doc["first_seen_at"] = existing_page.first_seen_at

        existing_pagecontent = PageContent.get_or_none(page_id=existing_page.id)
        is_different = existing_pagecontent is None or response.content != existing_pagecontent.content
        if is_different:
            doc["change_frequency"] = resource.get_default_change_frequency("content")
        else:
            existing_change_frequency = (
                existing_page.change_frequency
                or resource.get_default_change_frequency("content")
            )
            doc["change_frequency"] = resource.increment_change_frequency(
                existing_change_frequency, "content"
            )

    page = Page(**doc)
    try:
        page.save()
        if response.num_bytes <= constants.MAXIMUM_TEXT_PAGE_SIZE:
            content = {
                "page_id": page.id,
                "prompt": None,
                "content": response.content
            }
           
            existing_pagecontent = PageContent.get_or_none(page_id=page.id)
            if existing_pagecontent:
                content["id"] = existing_pagecontent.id
            
            pagecontent = PageContent(**content)
            pagecontent.save()
    except Exception as e:
        logging.error("Error adding page %s: %s", strip_control_chars(resource.fetchable_url), e)
    
    return page, is_different


def should_skip(resource):
    should_skip = False
    try:
        for excluded_prefix in EXCLUDED_URL_PREFIXES:
            if resource.fetchable_url.startswith(excluded_prefix):
                should_skip = True
                break
        for excluded_path in EXCLUDED_URL_PATHS:
            if resource.urlsplit.path.lower().endswith(excluded_path):
                should_skip = True
                break
        m = EXCLUDED_URL_PATTERN.match(resource.fetchable_url)
        if m:
            should_skip = True
    except:
        logging.error("Error checking for exclude of url: %s", strip_control_chars(resource.raw_url))
        should_skip = True

    return should_skip


def index_links(from_resource, contained_resources):
    from_page, created = Page.get_or_create(url=from_resource.fetchable_url)
   
    ## first delete all links that this page as had before
    ## than add new links
    try:
        Link.delete().where(Link.from_page == from_page).execute()
    except:
        logging.error("Error deleting a link: %s", Link.from_page)
    data = []
    for cr in contained_resources:
        if should_skip(cr):
            continue
        to_page = Page.get_or_none(url=cr.fetchable_url)
        if not to_page:
            to_page = Page.create(
                url=cr.fetchable_url,
                domain=cr.normalized_host,
                port=cr.urlsplit.port or 1965,
                first_seen_at=datetime.utcnow()
            )
        data.append(
            {
                "from_page": from_page,
                "to_page": to_page,
                "is_cross_host_like": Link.get_is_cross_host_like(from_resource, cr),
            }
        )
    try:
        Link.insert_many(data).execute()
    except Exception as e:
        logging.error("Error insert links: %s",e) 


def fetch_robots_file(robot_host):
    robot_url = urljoin("gemini://{}".format(robot_host), "/robots.txt")
    logging.info(
        "Fetching robots file: %s", strip_control_chars(robot_url)
    )
    rp = GeminiRobotFileParser(robot_url)
    rp.read()
    return rp


def get_robots_file(robot_host):
    if robot_host not in robot_file_map:
        robot_file_map[robot_host] = fetch_robots_file(robot_host)
    return robot_file_map[robot_host]


def crawl_page(
    gemini_resource, current_depth, redirect_chain=[]
):
    gr = gemini_resource
    url = gr.fetchable_url
    if not gemini_resource.is_valid:
        logging.warn(
            "Not a valid gemini resource, skipping: %s",
            strip_control_chars(gemini_resource.url),
        )
        return
    if max_crawl_depth >= 0 and current_depth > max_crawl_depth:
        logging.warn(
            "Going too deep, skipping: %s", strip_control_chars(url)
        )
        return
    if should_skip(gr):
        logging.debug(
            "URL is excluded, skipping: %s",
            strip_control_chars(url),
        )
        return
    if gr.normalized_host in failure_count and failure_count[gr.normalized_host] > constants.MAXIMUM_FAILED_REQUEST_COUNT:
        logging.debug(
            "Too many failed requests for host, skipping: %s", strip_control_chars(url)
        )
        return
    
    existing_page = Page.get_or_none(url=gr.fetchable_url)
    if existing_page and existing_page.change_frequency is not None:
        most_recent_crawl = existing_page.last_crawl_at
        if most_recent_crawl and datetime.utcnow() < most_recent_crawl + timedelta(
            hours=existing_page.change_frequency):
            logging.debug(
                "Too soon to recrawl, skipping: %s",
                strip_control_chars(gr.fetchable_url),
            )
            return

    # ROBOTS
    robots_file = get_robots_file(gr.normalized_host)
    crawl_delay = None
    if robots_file is not None:
        logging.debug("Found robots.txt for %s", gr.fetchable_url)
        # only fetch if allowed for a matching user-agent:
        # in priority order "gus" > "indexer" > "*"
        can_fetch = robots_file.can_fetch_prioritized(["gus", "indexer", "*"], gr.fetchable_url)

        # same approach as above - last value wins
        crawl_delay = robots_file.crawl_delay("indexer")

        if not can_fetch:
            logging.debug(
                "Blocked by robots.txt, skipping: %s",
                strip_control_chars(url),
            )
            return

    # crawl delay
    if gr.normalized_host in domain_hit_timings:
        if gr.normalized_host in CRAWL_DELAYS:
            next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
                milliseconds=CRAWL_DELAYS[gr.normalized_host]
            )
        elif not crawl_delay:
            next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
                milliseconds=300
            )
        else:
            next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta(
                milliseconds=crawl_delay
            )
        sleep_duration = max((next_allowed_hit - datetime.utcnow()).total_seconds(), 0)
        time.sleep(sleep_duration)
    domain_hit_timings[gr.normalized_host] = datetime.utcnow()

    # Actually fetch!
    logging.info("Fetching resource: %s", strip_control_chars(url))
    if gr.fully_qualified_parent_url is not None:
        logging.debug(
            "with parent: %s",
            strip_control_chars(gr.fully_qualified_parent_url),
        )
    response = gr.fetch()

    if response is None:
        # problem before getting a response
        logging.warn("Failed to fetch: %s", strip_control_chars(url))
        page = index_error(gr, True, None)
        
        failure_count[gr.normalized_host] = failure_count[gr.normalized_host] + 1 if gr.normalized_host in failure_count else 1
        logging.warn("Failed request count for host %s is %d", gr.normalized_host, failure_count[gr.normalized_host])
        return

    failure_count[gr.normalized_host] = 0
    if response.status.startswith("4"):
        # temporary error status
        logging.debug(
            "Got temporary error: %s: %s %s",
            strip_control_chars(url),
            response.status,
            response.error_message,
        )
        page = index_error(gr, True, response)

    elif response.status.startswith("5"):
        # permanent error status
        logging.debug(
            "Got permanent error: %s: %s %s",
            strip_control_chars(url),
            response.status,
            response.error_message,
        )
        page = index_error(gr, False, response)

    elif response.status.startswith("3"):
        # redirect status
        logging.debug(
            "Got redirected: %s: %s %s",
            strip_control_chars(url),
            response.status,
            response.url,
        )
        if len(redirect_chain) > constants.MAXIMUM_REDIRECT_CHAIN_LENGTH:
            logging.info(
                "Aborting, maximum redirect chain length reached: %s",
                strip_control_chars(url),
            )
            return
        redirect_resource = GeminiResource(
            response.url, gr.fetchable_url, gr.normalized_host
        )
        if redirect_resource.fetchable_url == gr.fetchable_url:
            logging.info(
                "Aborting, redirecting to self: %s",
                strip_control_chars(url),
            )
            return
        page = index_redirect(gr, response)
        index_links(gr, [redirect_resource])
        try:
            crawl_page(redirect_resource, current_depth,
                redirect_chain=redirect_chain + [gr.fetchable_url],)
        except Exception as e:
            logging.error("Failed to crawl outdated resource %s with error: %s", redirect_resource.fetchable_url, e)

    elif response.status.startswith("1"):
        # input status
        logging.debug(
            "Input requested at: %s: %s %s",
            strip_control_chars(url),
            response.status,
            response.prompt,
        )
        page = index_prompt(gr, response)
    elif response.status.startswith("2"):
        # success status
        logging.debug(
            "Successful request: %s: %s %s",
            strip_control_chars(url),
            response.status,
            response.content_type,
        )
        if response.content_type.startswith("text/"):
            page, is_different = index_content(gr, response)
            if response.content_type != "text/gemini":
                logging.debug(
                    "Content is not gemini text: %s: %s",
                    strip_control_chars(url),
                    response.content_type,
                )
            else:
                logging.debug(
                    "Got gemini text, extracting and crawling links: %s",
                    strip_control_chars(url),
                )
                contained_resources = gr.extract_contained_resources(response.content)
                index_links(gr, contained_resources)
                for resource in contained_resources:
                    try:
                        crawl_page(resource, current_depth + 1)
                    except Exception as e:
                        logging.error("Failed to crawl outdated resource %s with error: %s", resource.fetchable_url, e)
        else:
            page = index_binary(gr, response)
    else:
        logging.warn(
            "Got unhandled status: %s: %s",
            strip_control_chars(url),
            response.status,
        )


def load_expired_urls():
    expired_pages = Page.raw(
           """SELECT p.url
             FROM page as p
             WHERE datetime(last_crawl_at, REPLACE('fnord hours', 'fnord', change_frequency)) < datetime('now') OR last_crawl_at IS NULL""" )
    return [page.url for page in expired_pages.execute()]
#    expired_pages = Page.select(Page.url).where(Page.last_crawl_at < (datetime.utcnow() - timedelta(hours=Page.change_frequency)) & Page.last_crawl_at.is_null(True))
#    return expired_pages

def load_seed_request_urls():
    with open("seed-requests.txt") as f:
        content = f.readlines()
    # remove whitespace characters like `\n` at the end of each line
    content = [x.strip() for x in content]
    return content


def crawl_resource(resource):
    crawl_page(resource, 0)


def run_crawl(should_run_destructive=False, seed_urls=[]):
    global index_dir
    index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR
    pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True)
    global db
    db = init_db(f"{index_dir}/{constants.DB_FILENAME}")
    global robot_file_map
    robot_file_map = {}
    global domain_hit_timings
    domain_hit_timings = {}
    global max_crawl_depth
    max_crawl_depth = 700

    global failure_count
    failure_count = {}

    expired_resources = [GeminiResource(url) for url in load_expired_urls()]
    random.shuffle(expired_resources)
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(crawl_resource, expired_resources)
    executor.shutdown(wait=True, cancel_futures=False)
    
    submitted_resources = [GeminiResource(url) for url in load_seed_request_urls()]
    random.shuffle(submitted_resources)
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(crawl_resource, submitted_resources)
    executor.shutdown(wait=True, cancel_futures=False)

    logging.info("Finished!")


def main():
    args = parse_args()
    gus.lib.logging.handle_arguments(args)

    run_crawl(args.should_run_destructive, seed_urls=args.seed_urls)


def parse_args():
    parser = argparse.ArgumentParser(description="Crawl Geminispace.")
    parser.add_argument(
        "--destructive",
        "-d",
        dest="should_run_destructive",
        action="store_true",
        default=False,
        help="create a fresh index and perform a full Geminispace crawl",
    )
    parser.add_argument(
        "--seeds",
        "-s",
        metavar="URL",
        dest="seed_urls",
        nargs="+",
        default=[],
        help="one or more URLs with which to extend the seeds of the crawl",
    )
    gus.lib.logging.add_arguments(parser)
    args = parser.parse_args()
    return args

if __name__ == "__main__":
    main()