import re from urllib.parse import ( quote, unquote, urljoin, urlparse, urlsplit, urlunparse, urlunsplit, uses_relative, uses_netloc, ) from urllib.robotparser import RobotFileParser import gusmobile from gus import constants from gus.lib.domain import is_domain # hack: the built-in methods in urllib need to know the # Gemini protocol exists uses_relative.append("gemini") uses_netloc.append("gemini") LOG_ROOT_LIKE_PATTERN = re.compile( ".*/(gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/?$", flags=re.IGNORECASE, ) LOG_POST_LIKE_PATTERN = re.compile( ".*/((gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/.+$|.*/\d{4}[-_]\d{2}[-_]\d{2}.*$|.*/(19|20)\d{6}.*$)", flags=re.IGNORECASE, ) LOG_POST_LIKE_EXCLUSION_PATTERN = re.compile( ".*/(games|archive|archives|rss|handlers|diagnostics)/.*|.*atom.xml$|.*gemlog.gmi$|.*index.gmi$|.*index.gemini$", flags=re.IGNORECASE, ) LOG_POST_GEMLOGBLUE_LIKE_PATTERN = re.compile( "^/users/[a-z][-a-z0-9]*/\d+\.gmi?", flags=re.IGNORECASE ) LOG_POST_BOSTON_LIKE_PATTERN = re.compile( "^/boston/\d{4}/\d{2}/\d+\.\d+", flags=re.IGNORECASE ) ROOT_LIKE_ONLY_PATTERN = re.compile( "^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?$", flags=re.IGNORECASE ) ROOT_LIKE_PATTERN = re.compile( "^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?", flags=re.IGNORECASE ) AUTHOR_URL_PATTERN = re.compile( "^/~([a-z][-a-z0-9]*)/|^/users/~?([a-z][-a-z0-9]*)", flags=re.IGNORECASE ) AUTHOR_CONTENT_PATTERN = re.compile( ".*(by|author): ([\w\s\d]+)", flags=re.IGNORECASE | re.MULTILINE ) TITLE_CONTENT_PATTERN = re.compile("^#\s(.*)$", flags=re.IGNORECASE | re.MULTILINE) TITLE_URL_PATTERN = re.compile( ".*/(\d{8}[-_]|\d{4}[-_]\d{2}[-_]\d{2}[-_])?([a-z0-9-_]+)(\.[a-z0-9]+)$", flags=re.IGNORECASE, ) class GeminiRobotFileParser(RobotFileParser): def set_url(self, url): """Sets the URL referring to a robots.txt file.""" self.url = url u, _ = GeminiResource.urlsplit_featureful(url) self.host, self.path = u[1:3] def read(self): """Reads the robots.txt URL and feeds it to the parser.""" gr = GeminiResource(self.url) response = gr.fetch() if response is None: self.allow_all = True return if not response.status.startswith("2") or not response.content_type == "text/plain": self.allow_all = True else: self.parse(response.content.splitlines()) def read_from_string(self, robots_txt): """An utility method for writing tests""" self.parse(robots_txt.splitlines()) def can_fetch_prioritized(self, useragents, url): """Given a url and prioritized list of user-agents, is fetching allowed? Priority is with the highest priority first; eg. ["ThisIndexerBot", "generic-indexer", "generic-bot", "*"]. """ if self.allow_all: return True if self.disallow_all: return False if not self.last_checked: return False parsed_url = urlparse(unquote(url)) url = urlunparse(('','',parsed_url.path, parsed_url.params,parsed_url.query, parsed_url.fragment)) url = quote(url) or "/" def useragent_allowed(useragent): for entry in self.entries: if entry.applies_to(useragent): return entry.allowance(url) return None # map user-agents to allowances; the first non-None will be the prioritized allowance for ua in useragents: allowed = useragent_allowed(ua) if allowed is not None: return allowed # if none of the user-agents match, check default entry if self.default_entry: return self.default_entry.allowance(url) # if nothing matches, crawling is allowed return True class GeminiResource: def __init__(self, url, fully_qualified_parent_url=None, parent_hostname=None): self.raw_url = url self.urlsplit, self.is_relative = GeminiResource.urlsplit_featureful( url, fully_qualified_parent_url=fully_qualified_parent_url, parent_hostname=parent_hostname, ) self.is_valid = self.urlsplit is not None self.fully_qualified_parent_url = fully_qualified_parent_url self._normalized_url = None self._normalized_host = None self._normalized_host_like = None self._fetchable_url = None self._is_root_like = None self._is_log_root_like = None self._is_log_post_like = None self._default_change_frequency = None self.contained_resources = None def urlsplit_featureful(url, fully_qualified_parent_url=None, parent_hostname=None): # the point of this relatively complex function is to allow for protocol-less, # double-slash-prepended-less URLs that still get treated as absolute (i.e., # non-relative) URLs and thus get their hosts parsed correctly by `urlsplit`. # This is important because I want to be able to use the host for a number of # things behind the scenes. is_relative = False u = urlsplit(url, "gemini") if u.scheme != "gemini": return None, None if u.hostname is None: if url.startswith("/"): # process relative link if parent_hostname is None: return None, None joined = urljoin("gemini://{}".format(parent_hostname), url) u = urlsplit(joined, "gemini") is_relative = True else: # url does not start with / # could be: blah.com/test # could be: test url_split = url.split("/") if is_domain(url_split[0]): # treat schemeless uris as non-gemini as announced in # https://lists.orbitalfox.eu/archives/gemini/2020/003646.html return None, None else: # process relative link if fully_qualified_parent_url is None: return None, None joined = urljoin(fully_qualified_parent_url, url) u = urlsplit(joined, "gemini") is_relative = True return u, is_relative def _get_normalized_url(self): if not self.is_valid: return None if self._normalized_url is None: ( self._normalized_url, self._normalized_host, ) = self._get_normalized_url_and_host() return self._normalized_url def _get_normalized_host(self): if not self.is_valid: return None if self._normalized_host is None: ( self._normalized_url, self._normalized_host, ) = self._get_normalized_url_and_host() return self._normalized_host def _get_normalized_host_like(self): if not self.is_valid: return None if self._normalized_host_like is None: normalized_host_like = self.normalized_host m = ROOT_LIKE_PATTERN.match(self.urlsplit.path) if m: normalized_host_like += m[0].rstrip("/") self._normalized_host_like = normalized_host_like return self._normalized_host_like def _get_fetchable_url(self): if not self.is_valid: return None if self._fetchable_url is None: if self.is_relative: # leave off fragment portion of urlsplit at [4] urlsplit_parts = list(self.urlsplit[:4]) urlsplit_parts.append("") url = urlunsplit(urlsplit_parts) else: raw_url_lower = self.raw_url.lower() if raw_url_lower.startswith("gemini://"): url = self.raw_url elif raw_url_lower.startswith("//"): url = "gemini:{}".format(self.raw_url) else: url = "gemini://{}".format(self.raw_url) # leave off fragment portion of urlsplit at [4] if self.urlsplit[4] != "": url = url.replace("#{}".format(self.urlsplit[4]), "") self._fetchable_url = url return self._fetchable_url def _get_is_root_like(self): if self._is_root_like is None: is_root_like = False if ( self.urlsplit.path == "" or self.urlsplit.path == "/" or ROOT_LIKE_ONLY_PATTERN.match(self.urlsplit.path) ): is_root_like = True self._is_root_like = is_root_like return self._is_root_like def _get_is_log_root_like(self): if self._is_log_root_like is None: is_log_root_like = False if ( self.urlsplit.path == "" or self.urlsplit.path == "/" or LOG_ROOT_LIKE_PATTERN.match(self.urlsplit.path) ): is_log_root_like = True self._is_log_root_like = is_log_root_like return self._is_log_root_like def _get_is_log_post_like(self): if self._is_log_post_like is None: is_log_post_like = False post_like_match = LOG_POST_LIKE_PATTERN.match(self.urlsplit.path) post_like_exclusion_match = LOG_POST_LIKE_EXCLUSION_PATTERN.match( self.urlsplit.path ) post_gemlogblue_match = LOG_POST_GEMLOGBLUE_LIKE_PATTERN.match( self.urlsplit.path ) post_boston_match = LOG_POST_BOSTON_LIKE_PATTERN.match(self.urlsplit.path) if ( (post_like_match and not post_like_exclusion_match) or (self.normalized_host == "gemlog.blue" and post_gemlogblue_match) or (self.normalized_host == "gemini.conman.org" and post_boston_match) ): is_log_post_like = True self._is_log_post_like = is_log_post_like return self._is_log_post_like def get_friendly_author(self, content): if not self.is_valid: return None friendly_author = None author_url_match = AUTHOR_URL_PATTERN.match(self.urlsplit.path) if author_url_match: # first check url if author_url_match[1]: friendly_author = author_url_match[1] elif author_url_match[2]: friendly_author = author_url_match[2] if friendly_author is None: # if no URL match, try looking in page content if isinstance(content, str): author_content_match = AUTHOR_CONTENT_PATTERN.match(content) if author_content_match: friendly_author = author_content_match[1] if friendly_author is None: # if still no match, use normalized host friendly_author = self.normalized_host return friendly_author def get_friendly_title(self, content): if not self.is_valid: return None friendly_title = None if isinstance(content, str): title_content_match = TITLE_CONTENT_PATTERN.match(content) if title_content_match: # first try page content friendly_title = title_content_match[1] if friendly_title is None: # if no content match, try looking in URL title_url_match = TITLE_URL_PATTERN.match(self.urlsplit.path) if title_url_match: friendly_title = ( title_url_match[2] .replace("-", " ") .replace("_", " ") .strip() .title() ) if friendly_title is None: # if still no match, use URL path friendly_title = self.urlsplit.path.lstrip("/") return friendly_title def get_default_change_frequency(self, category): if not self.is_valid: return None if self._default_change_frequency is None: if category == "content": if self.is_root_like or self.is_log_root_like: change_frequency = constants.ROOT_CHANGE_FREQUENCY_DEFAULT else: change_frequency = constants.NON_ROOT_CHANGE_FREQUENCY_DEFAULT elif category == "binary": change_frequency = constants.BINARY_CHANGE_FREQUENCY_DEFAULT elif category == "redirect": change_frequency = constants.REDIRECT_CHANGE_FREQUENCY_DEFAULT elif category == "temp_error": change_frequency = constants.TEMP_ERROR_CHANGE_FREQUENCY_DEFAULT elif category == "perm_error": change_frequency = constants.PERM_ERROR_CHANGE_FREQUENCY_DEFAULT elif category == "prompt": change_frequency = constants.PROMPT_CHANGE_FREQUENCY_DEFAULT else: raise Exception.NameError("Unrecognized resource category") self._default_change_frequency = change_frequency return self._default_change_frequency def increment_change_frequency(self, existing_change_frequency, category): if category == "content": if self.is_root_like or self.is_log_root_like: return existing_change_frequency + constants.ROOT_CHANGE_FREQUENCY_INCREMENT else: return existing_change_frequency + constants.NON_ROOT_CHANGE_FREQUENCY_INCREMENT elif category == "binary": return existing_change_frequency + constants.BINARY_CHANGE_FREQUENCY_INCREMENT elif category == "redirect": return existing_change_frequency + constants.REDIRECT_CHANGE_FREQUENCY_INCREMENT elif category == "temp_error": return existing_change_frequency + constants.TEMP_ERROR_CHANGE_FREQUENCY_INCREMENT elif category == "perm_error": return existing_change_frequency + constants.PERM_ERROR_CHANGE_FREQUENCY_INCREMENT elif category == "prompt": return existing_change_frequency + constants.PROMPT_CHANGE_FREQUENCY_INCREMENT else: raise Exception.NameError("Unrecognized resource category") # constructed from fetchable_url # does not matter if quoted or unquoted so I choose arbitrarily to # standardize on unquoting it. normalized_url = property(_get_normalized_url) normalized_host = property(_get_normalized_host) # constructed from urlsplit or raw_url # should be quoted. fetchable_url = property(_get_fetchable_url) # constructed from fetchable_url # should be unquoted. is_root_like = property(_get_is_root_like) is_log_root_like = property(_get_is_log_root_like) is_log_post_like = property(_get_is_log_post_like) normalized_host_like = property(_get_normalized_host_like) def fetch(self): # NB: this intentionally does NOT fetch the normalized URL, because that could # cause an infinite loop with, e.g., normalization stripping a trailing slash # and a server redirecting to the same URL _with_ a trailing slash. return gusmobile.fetch(self.fetchable_url) def _get_normalized_url_and_host(self): url_normalized = unquote(self.fetchable_url.rstrip("/")) url_normalized = url_normalized.replace( self.urlsplit.hostname.lower() + ":1965", self.urlsplit.hostname.lower(), 1, ) host_normalized = self.urlsplit.hostname.lower() return url_normalized, host_normalized def extract_contained_resources(self, content): # this finds all gemini URLs within the content of a given GeminiResource and # returns them as a list of new GeminiResources if self.contained_resources: return self.contained_resources link_pattern = "^=>\s*(\S+)" preformat_pattern = r"^```.*?^```" content_without_preformat = re.sub( preformat_pattern, "", content, flags=re.DOTALL | re.MULTILINE ) probable_urls = re.findall( link_pattern, content_without_preformat, re.MULTILINE ) resources = [] for url in probable_urls: resource = GeminiResource( url, fully_qualified_parent_url=self.fetchable_url, parent_hostname=self.urlsplit.hostname, ) if resource.is_valid: resources.append(resource) self.contained_resources = resources return self.contained_resources