💾 Archived View for gmn.clttr.info › sources › geminispace.info.git › tree › gus › crawl.py.txt captured on 2024-02-05 at 10:05:35.
⬅️ Previous capture (2023-09-08)
-=-=-=-=-=-=-
import argparse import random import logging import re from concurrent.futures import ThreadPoolExecutor from datetime import datetime, timedelta import os import pathlib import time from urllib.parse import urljoin, uses_relative, uses_netloc import peewee from gus.excludes import EXCLUDED_URL_PREFIXES, EXCLUDED_URL_PATHS from . import constants from gus.lib.db_model import init_db, Page, PageContent, Link from gus.lib.gemini import GeminiResource, GeminiRobotFileParser import gus.lib.logging from gus.lib.logging import strip_control_chars # hack: the built-in methods in urllib need to know the # Gemini protocol exists uses_relative.append("gemini") uses_netloc.append("gemini") CRAWL_DELAYS = { "alexschroeder.ch": 5000, "communitywiki.org": 5000, } EXCLUDED_URL_PATTERN = re.compile( r"^gemini://(\d{6}\.ch|almp\d{4}\.app|.*/_(revert|history)/).*", flags=re.IGNORECASE ) def index_binary(resource, response): logging.debug( "Indexing binary for: %s", strip_control_chars(resource.fetchable_url), ) doc = { "url": resource.fetchable_url, "domain": resource.normalized_host, "port": resource.urlsplit.port or 1965, "content_type": response.content_type, "charset": response.charset, "size": response.num_bytes, "change_frequency": resource.get_default_change_frequency("binary"), "last_crawl_at": datetime.utcnow(), "last_crawl_success_at": datetime.utcnow(), "last_status" : response.status, "last_success_status": response.status, "last_status_message" : response.error_message, "first_seen_at" : datetime.utcnow() } existing_page = Page.get_or_none(url=resource.fetchable_url) if existing_page: doc["id"] = existing_page.id if not (existing_page.first_seen_at is None): doc["first_seen_at"] = existing_page.first_seen_at existing_change_frequency = ( existing_page.change_frequency or resource.get_default_change_frequency("binary") ) doc["change_frequency"] = resource.increment_change_frequency( existing_change_frequency, "binary" ) page = Page(**doc) try: page.save() except: logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) return page def index_redirect(resource, response): logging.debug( "Indexing redirect for: %s", strip_control_chars(resource.fetchable_url), ) doc = { "url": resource.fetchable_url, "domain": resource.normalized_host, "port": resource.urlsplit.port or 1965, "change_frequency": resource.get_default_change_frequency("redirect"), "last_crawl_at": datetime.utcnow(), "last_crawl_success_at": datetime.utcnow(), "last_status" : response.status, "last_success_status" : response.status, "last_status_message" : response.error_message, "first_seen_at" : datetime.utcnow() } existing_page = Page.get_or_none(url=resource.fetchable_url) if existing_page: doc["id"] = existing_page.id if not (existing_page.first_seen_at is None): doc["first_seen_at"] = existing_page.first_seen_at existing_change_frequency = ( existing_page.change_frequency or resource.get_default_change_frequency("redirect") ) doc["change_frequency"] = resource.increment_change_frequency( existing_change_frequency, "redirect" ) page = Page(**doc) try: page.save() except: logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) return page def index_error(resource, is_temporary, response): category = "temp_error" if is_temporary else "perm_error" default_change_frequency = resource.get_default_change_frequency(category) doc = { "url": resource.fetchable_url, "domain": resource.normalized_host, "port": resource.urlsplit.port or 1965, "change_frequency": default_change_frequency, "last_crawl_at": datetime.utcnow(), "last_status" : None if response is None else response.status, "last_status_message" : None if response is None else response.error_message } existing_page = Page.get_or_none(url=resource.fetchable_url) if existing_page: doc["id"] = existing_page.id existing_change_frequency = ( existing_page.change_frequency or default_change_frequency ) doc["change_frequency"] = resource.increment_change_frequency( existing_change_frequency, category ) page = Page(**doc) try: page.save() except: logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) return page def index_prompt(resource, response): logging.debug( "Indexing prompt for: %s", strip_control_chars(resource.fetchable_url), ) doc = { "url": resource.fetchable_url, "domain": resource.normalized_host, "port": resource.urlsplit.port or 1965, "content_type": "input", "charset": response.charset, "size": response.num_bytes, "change_frequency": resource.get_default_change_frequency("prompt"), "last_crawl_at": datetime.utcnow(), "last_crawl_success_at": datetime.utcnow(), "last_status" : response.status, "last_success_status" : response.status, "last_status_message" : response.error_message, "first_seen_at" : datetime.utcnow() } existing_page = Page.get_or_none(url=resource.fetchable_url) if existing_page: doc["id"] = existing_page.id if not (existing_page.first_seen_at is None): doc["first_seen_at"] = existing_page.first_seen_at existing_change_frequency = ( existing_page.change_frequency or resource.get_default_change_frequency("prompt") ) doc["change_frequency"] = resource.increment_change_frequency( existing_change_frequency, "prompt" ) page = Page(**doc) try: page.save() content = { "page_id": page.id, "prompt": response.prompt, "content": None } existing_pagecontent = PageContent.get_or_none(page_id=page.id) if existing_pagecontent: content["id"] = existing_pagecontent.id pagecontent = PageContent(**content) pagecontent.save() except: logging.error("Error adding page: %s", strip_control_chars(resource.fetchable_url)) return page def index_content(resource, response): logging.debug( "Storing content for: %s", strip_control_chars(resource.fetchable_url), ) doc = { "url": resource.fetchable_url, "domain": resource.normalized_host, "port": resource.urlsplit.port or 1965, "content_type": response.content_type, "charset": response.charset, "size": response.num_bytes, "change_frequency": resource.get_default_change_frequency("content"), "last_crawl_at": datetime.utcnow(), "last_crawl_success_at": datetime.utcnow(), "last_status" : response.status, "last_success_status" : response.status, "last_status_message" : response.error_message, "first_seen_at" : datetime.utcnow() } if response.content_type == "text/gemini": doc["lang"] = (response.lang or "none",) existing_page = Page.get_or_none(url=resource.fetchable_url) is_different = False if existing_page: doc["id"] = existing_page.id if not (existing_page.first_seen_at is None): doc["first_seen_at"] = existing_page.first_seen_at existing_pagecontent = PageContent.get_or_none(page_id=existing_page.id) is_different = existing_pagecontent is None or response.content != existing_pagecontent.content if is_different: doc["change_frequency"] = resource.get_default_change_frequency("content") else: existing_change_frequency = ( existing_page.change_frequency or resource.get_default_change_frequency("content") ) doc["change_frequency"] = resource.increment_change_frequency( existing_change_frequency, "content" ) page = Page(**doc) try: page.save() if response.num_bytes <= constants.MAXIMUM_TEXT_PAGE_SIZE: content = { "page_id": page.id, "prompt": None, "content": response.content } existing_pagecontent = PageContent.get_or_none(page_id=page.id) if existing_pagecontent: content["id"] = existing_pagecontent.id pagecontent = PageContent(**content) pagecontent.save() except Exception as e: logging.error("Error adding page %s: %s", strip_control_chars(resource.fetchable_url), e) return page, is_different def should_skip(resource): should_skip = False try: for excluded_prefix in EXCLUDED_URL_PREFIXES: if resource.fetchable_url.startswith(excluded_prefix): should_skip = True break for excluded_path in EXCLUDED_URL_PATHS: if resource.urlsplit.path.lower().endswith(excluded_path): should_skip = True break m = EXCLUDED_URL_PATTERN.match(resource.fetchable_url) if m: should_skip = True except: logging.error("Error checking for exclude of url: %s", strip_control_chars(resource.raw_url)) should_skip = True return should_skip def index_links(from_resource, contained_resources): from_page, created = Page.get_or_create(url=from_resource.fetchable_url) ## first delete all links that this page as had before ## than add new links try: Link.delete().where(Link.from_page == from_page).execute() except: logging.error("Error deleting a link: %s", Link.from_page) data = [] for cr in contained_resources: if should_skip(cr): continue to_page = Page.get_or_none(url=cr.fetchable_url) if not to_page: to_page = Page.create( url=cr.fetchable_url, domain=cr.normalized_host, port=cr.urlsplit.port or 1965, first_seen_at=datetime.utcnow() ) data.append( { "from_page": from_page, "to_page": to_page, "is_cross_host_like": Link.get_is_cross_host_like(from_resource, cr), } ) try: Link.insert_many(data).execute() except Exception as e: logging.error("Error insert links: %s",e) def fetch_robots_file(robot_host): robot_url = urljoin("gemini://{}/".format(robot_host), "robots.txt") logging.info( "Fetching robots file: %s", strip_control_chars(robot_url) ) rp = GeminiRobotFileParser(robot_url) rp.read() return rp def get_robots_file(robot_host): logging.debug("Looking for robots file for host: %s", robot_host) if robot_host not in robot_file_map: robot_file_map[robot_host] = fetch_robots_file(robot_host) return robot_file_map[robot_host] def crawl_page( gemini_resource, current_depth, redirect_chain=[] ): gr = gemini_resource url = gr.fetchable_url if not gemini_resource.is_valid: logging.warn( "Not a valid gemini resource, skipping: %s", strip_control_chars(gemini_resource.url), ) return if max_crawl_depth >= 0 and current_depth > max_crawl_depth: logging.warn( "Going too deep, skipping: %s", strip_control_chars(url) ) return if should_skip(gr): logging.debug( "URL is excluded, skipping: %s", strip_control_chars(url), ) return if gr.normalized_host in failure_count and failure_count[gr.normalized_host] > constants.MAXIMUM_FAILED_REQUEST_COUNT: logging.debug( "Too many failed requests for host, skipping: %s", strip_control_chars(url) ) return existing_page = Page.get_or_none(url=gr.fetchable_url) if existing_page and existing_page.change_frequency is not None: most_recent_crawl = existing_page.last_crawl_at if most_recent_crawl and datetime.utcnow() < most_recent_crawl + timedelta( hours=existing_page.change_frequency): logging.debug( "Too soon to recrawl, skipping: %s", strip_control_chars(gr.fetchable_url), ) return # ROBOTS # use the normalized_host_like to fetch user-specific robots.txt of pubnixes robots_file = get_robots_file(gr.normalized_host_like) crawl_delay = None if robots_file is not None: logging.debug("Found robots.txt for URI: %s", gr.fetchable_url) # only fetch if allowed for a matching user-agent: # in priority order "gus" > "indexer" > "*" can_fetch = robots_file.can_fetch_prioritized(["gus", "indexer", "*"], gr.fetchable_url) # same approach as above - last value wins crawl_delay = robots_file.crawl_delay("indexer") if not can_fetch: logging.debug( "Blocked by robots.txt, skipping: %s", strip_control_chars(url), ) return # crawl delay if gr.normalized_host in domain_hit_timings: if gr.normalized_host in CRAWL_DELAYS: next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( milliseconds=CRAWL_DELAYS[gr.normalized_host] ) elif not crawl_delay: next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( milliseconds=300 ) else: next_allowed_hit = domain_hit_timings[gr.normalized_host] + timedelta( milliseconds=crawl_delay ) sleep_duration = max((next_allowed_hit - datetime.utcnow()).total_seconds(), 0) time.sleep(sleep_duration) domain_hit_timings[gr.normalized_host] = datetime.utcnow() # Actually fetch! logging.info("Fetching resource: %s", strip_control_chars(url)) if gr.fully_qualified_parent_url is not None: logging.debug( "with parent: %s", strip_control_chars(gr.fully_qualified_parent_url), ) response = gr.fetch() if response is None: # problem before getting a response logging.warn("Failed to fetch: %s", strip_control_chars(url)) page = index_error(gr, True, None) failure_count[gr.normalized_host] = failure_count[gr.normalized_host] + 1 if gr.normalized_host in failure_count else 1 logging.warn("Failed request count for host %s is %d", gr.normalized_host, failure_count[gr.normalized_host]) return failure_count[gr.normalized_host] = 0 if response.status.startswith("4"): # temporary error status logging.debug( "Got temporary error: %s: %s %s", strip_control_chars(url), response.status, response.error_message, ) page = index_error(gr, True, response) elif response.status.startswith("5"): # permanent error status logging.debug( "Got permanent error: %s: %s %s", strip_control_chars(url), response.status, response.error_message, ) page = index_error(gr, False, response) elif response.status.startswith("3"): # redirect status logging.debug( "Got redirected: %s: %s %s", strip_control_chars(url), response.status, response.url, ) if len(redirect_chain) > constants.MAXIMUM_REDIRECT_CHAIN_LENGTH: logging.info( "Aborting, maximum redirect chain length reached: %s", strip_control_chars(url), ) return redirect_resource = GeminiResource( response.url, gr.fetchable_url, gr.normalized_host ) if redirect_resource.fetchable_url == gr.fetchable_url: logging.info( "Aborting, redirecting to self: %s", strip_control_chars(url), ) return page = index_redirect(gr, response) index_links(gr, [redirect_resource]) try: crawl_page(redirect_resource, current_depth, redirect_chain=redirect_chain + [gr.fetchable_url],) except Exception as e: logging.error("Failed to crawl outdated resource %s with error: %s", redirect_resource.fetchable_url, e) elif response.status.startswith("1"): # input status logging.debug( "Input requested at: %s: %s %s", strip_control_chars(url), response.status, response.prompt, ) page = index_prompt(gr, response) elif response.status.startswith("2"): # success status logging.debug( "Successful request: %s: %s %s", strip_control_chars(url), response.status, response.content_type, ) if response.content_type.startswith("text/"): page, is_different = index_content(gr, response) if response.content_type != "text/gemini": logging.debug( "Content is not gemini text: %s: %s", strip_control_chars(url), response.content_type, ) else: logging.debug( "Got gemini text, extracting and crawling links: %s", strip_control_chars(url), ) contained_resources = gr.extract_contained_resources(response.content) index_links(gr, contained_resources) for resource in contained_resources: try: crawl_page(resource, current_depth + 1) except Exception as e: logging.error("Failed to crawl outdated resource %s with error: %s", resource.fetchable_url, e) else: page = index_binary(gr, response) else: logging.warn( "Got unhandled status: %s: %s", strip_control_chars(url), response.status, ) def load_expired_urls(): expired_pages = Page.raw( """SELECT p.url FROM page as p WHERE datetime(last_crawl_at, REPLACE('fnord hours', 'fnord', change_frequency)) < datetime('now') OR last_crawl_at IS NULL""" ) return [page.url for page in expired_pages.execute()] def load_seed_request_urls(): with open("seed-requests.txt") as f: content = f.readlines() # remove whitespace characters like `\n` at the end of each line content = [x.strip() for x in content] return content def crawl_resource(resource): crawl_page(resource, 0) def run_crawl(should_run_destructive=False, seed_urls=[]): global index_dir index_dir = constants.INDEX_DIR_NEW if should_run_destructive else constants.INDEX_DIR pathlib.Path(index_dir).mkdir(parents=True, exist_ok=True) global db db = init_db(f"{index_dir}/{constants.DB_FILENAME}") global robot_file_map robot_file_map = {} global domain_hit_timings domain_hit_timings = {} global max_crawl_depth max_crawl_depth = 700 global failure_count failure_count = {} expired_resources = [GeminiResource(url) for url in load_expired_urls()] random.shuffle(expired_resources) with ThreadPoolExecutor(max_workers=3) as executor: executor.map(crawl_resource, expired_resources) executor.shutdown(wait=True, cancel_futures=False) submitted_resources = [GeminiResource(url) for url in load_seed_request_urls()] random.shuffle(submitted_resources) with ThreadPoolExecutor(max_workers=3) as executor: executor.map(crawl_resource, submitted_resources) executor.shutdown(wait=True, cancel_futures=False) logging.info("Finished!") def main(): args = parse_args() gus.lib.logging.handle_arguments(args) run_crawl(args.should_run_destructive, seed_urls=args.seed_urls) def parse_args(): parser = argparse.ArgumentParser(description="Crawl Geminispace.") parser.add_argument( "--destructive", "-d", dest="should_run_destructive", action="store_true", default=False, help="create a fresh index and perform a full Geminispace crawl", ) parser.add_argument( "--seeds", "-s", metavar="URL", dest="seed_urls", nargs="+", default=[], help="one or more URLs with which to extend the seeds of the crawl", ) gus.lib.logging.add_arguments(parser) args = parser.parse_args() return args if __name__ == "__main__": main()