import re from urllib.parse import ( quote, unquote, urljoin, urlparse, urlsplit, urlunparse, urlunsplit, uses_relative, uses_netloc, ) from urllib.robotparser import RobotFileParser import gusmobile from gus import constants from gus.lib.domain import is_domain # hack: the built-in methods in urllib need to know the # Gemini protocol exists uses_relative.append("gemini") uses_netloc.append("gemini") LOG_ROOT_LIKE_PATTERN = re.compile( r".*/(gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/?$", flags=re.IGNORECASE, ) LOG_POST_LIKE_PATTERN = re.compile( r".*/((gemlog|glog|journal|twinlog|posts|post|tangents|phlog|starlog|pikkulog|blog|log)/.+$|.*/\d{4}[-_]\d{2}[-_]\d{2}.*$|.*/(19|20)\d{6}.*$)", flags=re.IGNORECASE, ) LOG_POST_LIKE_EXCLUSION_PATTERN = re.compile( r".*/(games|archive|archives|rss|handlers|diagnostics)/.*|.*atom.xml$|.*gemlog.gmi$|.*index.gmi$|.*index.gemini$", flags=re.IGNORECASE, ) LOG_POST_GEMLOGBLUE_LIKE_PATTERN = re.compile( r"^/users/[a-z][-a-z0-9]*/\d+\.gmi?", flags=re.IGNORECASE ) LOG_POST_BOSTON_LIKE_PATTERN = re.compile( r"^/boston/\d{4}/\d{2}/\d+\.\d+", flags=re.IGNORECASE ) ROOT_LIKE_ONLY_PATTERN = re.compile( r"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?$", flags=re.IGNORECASE ) ROOT_LIKE_PATTERN = re.compile( r"^/(~[a-z][-a-z0-9]*|users/[a-z][-a-z0-9]*|users)/?", flags=re.IGNORECASE ) AUTHOR_URL_PATTERN = re.compile( r"^/~([a-z][-a-z0-9]*)/|^/users/~?([a-z][-a-z0-9]*)", flags=re.IGNORECASE ) AUTHOR_CONTENT_PATTERN = re.compile( r".*(by|author): ([\w\s\d]+)", flags=re.IGNORECASE | re.MULTILINE ) TITLE_CONTENT_PATTERN = re.compile(r"^#\s(.*)$", flags=re.IGNORECASE | re.MULTILINE) TITLE_URL_PATTERN = re.compile( r".*/(\d{8}[-_]|\d{4}[-_]\d{2}[-_]\d{2}[-_])?([a-z0-9-_]+)(\.[a-z0-9]+)$", flags=re.IGNORECASE, ) class GeminiRobotFileParser(RobotFileParser): def set_url(self, url): """Sets the URL referring to a robots.txt file.""" self.url = url u, _ = GeminiResource.urlsplit_featureful(url) self.host, self.path = u[1:3] def read(self): """Reads the robots.txt URL and feeds it to the parser.""" gr = GeminiResource(self.url) response = gr.fetch() if response is None: self.allow_all = True return if not response.status.startswith("2") or not response.content_type.startswith("text/"): self.allow_all = True else: self.parse(response.content.splitlines()) def read_from_string(self, robots_txt): """An utility method for writing tests""" self.parse(robots_txt.splitlines()) def can_fetch_prioritized(self, useragents, url): """Given a url and prioritized list of user-agents, is fetching allowed? Priority is with the highest priority first; eg. ["ThisIndexerBot", "generic-indexer", "generic-bot", "*"]. """ if self.allow_all: return True if self.disallow_all: return False if not self.last_checked: return False parsed_url = urlparse(unquote(url)) url = urlunparse(('','',parsed_url.path, parsed_url.params,parsed_url.query, parsed_url.fragment)) url = quote(url) or "/" def useragent_allowed(useragent): for entry in self.entries: if entry.applies_to(useragent): return entry.allowance(url) return None # map user-agents to allowances; the first non-None will be the prioritized allowance for ua in useragents: allowed = useragent_allowed(ua) if allowed is not None: return allowed # if none of the user-agents match, check default entry if self.default_entry: return self.default_entry.allowance(url) # if nothing matches, crawling is allowed return True class GeminiResource: def __init__(self, url, fully_qualified_parent_url=None, parent_hostname=None): self.raw_url = url self.urlsplit, self.is_relative = GeminiResource.urlsplit_featureful( url, fully_qualified_parent_url=fully_qualified_parent_url, parent_hostname=parent_hostname, ) self.is_valid = self.urlsplit is not None self.fully_qualified_parent_url = fully_qualified_parent_url self._normalized_host = None self._normalized_host_like = None self._fetchable_url = None self._is_root_like = None self._is_log_root_like = None self._is_log_post_like = None self._default_change_frequency = None self.contained_resources = None def urlsplit_featureful(url, fully_qualified_parent_url=None, parent_hostname=None): # the point of this relatively complex function is to allow for protocol-less, # double-slash-prepended-less URLs that still get treated as absolute (i.e., # non-relative) URLs and thus get their hosts parsed correctly by `urlsplit`. # This is important because I want to be able to use the host for a number of # things behind the scenes. is_relative = False u = urlsplit(url, "gemini") if u.scheme != "gemini": return None, None if u.hostname is None: if url.startswith("/"): # process relative link if parent_hostname is None: return None, None joined = urljoin("gemini://{}".format(parent_hostname), url) u = urlsplit(joined, "gemini") is_relative = True else: # url does not start with / # could be: blah.com/test # could be: test url_split = url.split("/") if is_domain(url_split[0]): # treat schemeless uris as non-gemini as announced in # https://lists.orbitalfox.eu/archives/gemini/2020/003646.html return None, None else: # process relative link if fully_qualified_parent_url is None: return None, None joined = urljoin(fully_qualified_parent_url, url) u = urlsplit(joined, "gemini") is_relative = True return u, is_relative def _get_normalized_host(self): if not self.is_valid: return None if self._normalized_host is None: self._normalized_host = self.urlsplit.hostname return self._normalized_host def _get_normalized_host_like(self): if not self.is_valid: return None if self._normalized_host_like is None: normalized_host_like = self.normalized_host m = ROOT_LIKE_PATTERN.match(self.urlsplit.path) if m: normalized_host_like += m[0].rstrip("/") self._normalized_host_like = normalized_host_like return self._normalized_host_like def _get_fetchable_url(self): if not self.is_valid: return None if self._fetchable_url is None: # we deliberately do not work with the fragment part self._fetchable_url = "{}://{}{}{}{}".format( self.urlsplit.scheme, self.urlsplit.hostname, "" if self.urlsplit.port is None or self.urlsplit.port == 1965 else ":{}".format(self.urlsplit.port), "/" if self.urlsplit.path == "" else self.urlsplit.path, "" if self.urlsplit.query == "" else "?{}".format(self.urlsplit.query)) return self._fetchable_url def _get_is_root_like(self): if self._is_root_like is None: is_root_like = False if ( self.urlsplit.path == "" or self.urlsplit.path == "/" or ROOT_LIKE_ONLY_PATTERN.match(self.urlsplit.path) ): is_root_like = True self._is_root_like = is_root_like return self._is_root_like def _get_is_log_root_like(self): if self._is_log_root_like is None: is_log_root_like = False if ( self.urlsplit.path == "" or self.urlsplit.path == "/" or LOG_ROOT_LIKE_PATTERN.match(self.urlsplit.path) ): is_log_root_like = True self._is_log_root_like = is_log_root_like return self._is_log_root_like def _get_is_log_post_like(self): if self._is_log_post_like is None: is_log_post_like = False post_like_match = LOG_POST_LIKE_PATTERN.match(self.urlsplit.path) post_like_exclusion_match = LOG_POST_LIKE_EXCLUSION_PATTERN.match( self.urlsplit.path ) post_gemlogblue_match = LOG_POST_GEMLOGBLUE_LIKE_PATTERN.match( self.urlsplit.path ) post_boston_match = LOG_POST_BOSTON_LIKE_PATTERN.match(self.urlsplit.path) if ( (post_like_match and not post_like_exclusion_match) or (self.normalized_host == "gemlog.blue" and post_gemlogblue_match) or (self.normalized_host == "gemini.conman.org" and post_boston_match) ): is_log_post_like = True self._is_log_post_like = is_log_post_like return self._is_log_post_like def get_default_change_frequency(self, category): if not self.is_valid: return None if self._default_change_frequency is None: if category == "content": if self.is_root_like or self.is_log_root_like: change_frequency = constants.ROOT_CHANGE_FREQUENCY_DEFAULT else: change_frequency = constants.NON_ROOT_CHANGE_FREQUENCY_DEFAULT elif category == "binary": change_frequency = constants.BINARY_CHANGE_FREQUENCY_DEFAULT elif category == "redirect": change_frequency = constants.REDIRECT_CHANGE_FREQUENCY_DEFAULT elif category == "temp_error": change_frequency = constants.TEMP_ERROR_CHANGE_FREQUENCY_DEFAULT elif category == "perm_error": change_frequency = constants.PERM_ERROR_CHANGE_FREQUENCY_DEFAULT elif category == "prompt": change_frequency = constants.PROMPT_CHANGE_FREQUENCY_DEFAULT else: raise Exception.NameError("Unrecognized resource category") self._default_change_frequency = change_frequency return self._default_change_frequency def increment_change_frequency(self, existing_change_frequency, category): if category == "content": if self.is_root_like or self.is_log_root_like: return existing_change_frequency + constants.ROOT_CHANGE_FREQUENCY_INCREMENT else: return existing_change_frequency + constants.NON_ROOT_CHANGE_FREQUENCY_INCREMENT elif category == "binary": return existing_change_frequency + constants.BINARY_CHANGE_FREQUENCY_INCREMENT elif category == "redirect": return existing_change_frequency + constants.REDIRECT_CHANGE_FREQUENCY_INCREMENT elif category == "temp_error": return existing_change_frequency + constants.TEMP_ERROR_CHANGE_FREQUENCY_INCREMENT elif category == "perm_error": return existing_change_frequency + constants.PERM_ERROR_CHANGE_FREQUENCY_INCREMENT elif category == "prompt": return existing_change_frequency + constants.PROMPT_CHANGE_FREQUENCY_INCREMENT else: raise Exception.NameError("Unrecognized resource category") def fetch(self): # NB: this intentionally does NOT fetch the normalized URL, because that could # cause an infinite loop with, e.g., normalization stripping a trailing slash # and a server redirecting to the same URL _with_ a trailing slash. return gusmobile.fetch(self.fetchable_url) def extract_contained_resources(self, content): # this finds all gemini URLs within the content of a given GeminiResource and # returns them as a list of new GeminiResources if self.contained_resources: return self.contained_resources link_pattern = r"^=>\s*(\S+)" preformat_pattern = r"^```.*?^```" content_without_preformat = re.sub( preformat_pattern, "", content, flags=re.DOTALL | re.MULTILINE ) probable_urls = re.findall( link_pattern, content_without_preformat, re.MULTILINE ) resources = [] for url in probable_urls: resource = GeminiResource( url, fully_qualified_parent_url=self.fetchable_url, parent_hostname=self.urlsplit.hostname.lower(), ) if resource.is_valid: resources.append(resource) self.contained_resources = resources return self.contained_resources # constructed from fetchable_url # does not matter if quoted or unquoted so I choose arbitrarily to # standardize on unquoting it. normalized_host = property(_get_normalized_host) # constructed from urlsplit, should be quoted. fetchable_url = property(_get_fetchable_url) # constructed from fetchable_url, should be unquoted. is_root_like = property(_get_is_root_like) is_log_root_like = property(_get_is_log_root_like) is_log_post_like = property(_get_is_log_post_like) # pubnix-aware host version, means that the user-specific dir is appended normalized_host_like = property(_get_normalized_host_like)