💾 Archived View for gmn.clttr.info › sources › gusmobile.git › tree › gusmobile › client.py.txt captured on 2024-02-05 at 10:07:42.
⬅️ Previous capture (2023-09-08)
-=-=-=-=-=-=-
# client.py # # Copyright 2019 Jason McBrayer # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. from email.message import EmailMessage import codecs import collections import fnmatch import io import mimetypes import os.path import random import shlex import shutil import socket import subprocess import tempfile import urllib.parse import ssl import sys import time class Response: content = None content_type = None charset = None lang = None url = None status = None status_meta = None prompt = None num_bytes = None error_message = None def __init__( self, content=None, content_type=None, charset=None, lang=None, url=None, status=None, status_meta=None, prompt=None, num_bytes=None, error_message=None ): self.content = content self.content_type = content_type self.charset = charset self.lang = lang self.url = url self.status = status self.status_meta = status_meta self.prompt = prompt self.num_bytes = num_bytes self.error_message = error_message def fetch(raw_url): # Do everything which touches the network in one block, # so we only need to catch exceptions once url = urllib.parse.urlparse(raw_url, 'gemini') header = "" try: # Is this a local file? if not url.netloc: print("ERROR: {} parses with no netloc".format(raw_url)) f.close() return else: address, f = _send_request(url) # Read response header header = f.readline(1027) header = header.decode("UTF-8") if not header or header[-1] != '\n': _debug("ERROR: Received invalid header from server!") return header = header.strip() _debug("Response header: %s." % header) # Catch network errors which may happen on initial connection except Exception as err: # Print an error message if isinstance(err, socket.gaierror): print("ERROR: DNS error!") return elif isinstance(err, ConnectionRefusedError): print("ERROR: Connection refused!") return elif isinstance(err, ConnectionResetError): print("ERROR: Connection reset!") return elif isinstance(err, (TimeoutError, socket.timeout)): print( """ERROR: Connection timed out! Slow internet connection? Use 'set timeout' to be more patient.""" ) return else: print("ERROR: " + str(err)) return # Validate header header_split = header.split(maxsplit=1) if len(header_split) < 1: print("ERROR: Received invalid header from server!") f.close() return status = header_split[0] if len(header_split) > 1: meta = header_split[1] if len(header) > 1024 or len(status) != 2 or not status.isnumeric(): print("ERROR: Received invalid header from server!") f.close() return # Handle headers. Not all headers are handled yet. # Input if status.startswith("1"): if len(header_split) < 2: print("ERROR: Input status requires a meta value in header!") return return Response( url=url.geturl(), status=status, prompt=meta, ) # Redirects elif status.startswith("3"): if len(header_split) < 2: print("ERROR: Redirect status requires a meta value in header!") return return Response( url=urllib.parse.urlparse(meta).geturl(), status=status, ) # Errors elif status.startswith("4") or status.startswith("5"): if len(header_split) < 2: print("ERROR: Error status requires a meta value in header!") return return Response( status=status, error_message=meta, ) return # Client cert elif status.startswith("6"): print("ERROR: The requested resource requires client-certificate") return # Invalid status elif not status.startswith("2"): print("ERROR: Server returned undefined status code %s!" % status) return # Handle success assert status.startswith("2") if len(header_split) < 2: print("ERROR: Success status requires a meta value in header!") return mime = meta if mime == "": mime = "text/gemini; charset=utf-8" msg = EmailMessage() msg['content-type'] = mime mime, mime_options = msg.get_content_type(), msg['Content-Type'].params default_charset = "utf-8" charset = None if "charset" in mime_options: try: codecs.lookup(mime_options["charset"]) charset = mime_options["charset"] except LookupError: print("Header declared unknown encoding %s" % mime_options["charset"]) return lang = mime_options["lang"] if "lang" in mime_options else None # Read the response body over the network try: body = f.read() except Exception: print("Error reading response over network!") return if mime.startswith("text/"): try: content = codecs.decode(body, charset or default_charset) except: # print("ERROR: problem decoding content with %s charset" % charset) return else: content = body return Response( content=content, content_type=mime, charset=charset, lang=lang, num_bytes=len(body), url=url.geturl(), status=status, ) def _send_request(url): """Send a selector to a given host and port. Returns the resolved address and binary file with the reply.""" port = url.port if url.port is not None else 1965 addresses = _get_addresses(url.hostname, port) # Connect to remote host by any address possible err = None for address in addresses: _debug("Connecting to: " + str(address[4])) s = socket.socket(address[0], address[1]) s.settimeout(15.0) context = ssl.SSLContext(protocol=ssl.PROTOCOL_TLS_CLIENT) context.check_hostname = False context.verify_mode = ssl.CERT_NONE # Impose minimum TLS version if sys.version_info.minor == 7: context.minimum_version = ssl.TLSVersion.TLSv1_2 else: context.options | ssl.OP_NO_TLSv1_1 context.options | ssl.OP_NO_SSLv3 context.options | ssl.OP_NO_SSLv2 context.set_ciphers( "AES256-GCM-SHA384:AES128-GCM-SHA256:ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256:!aNULL:!eNULL:!EXPORT:!DES:!RC4:!3DES:!MD5:!PSK" ) # print(context.get_ciphers()) s = context.wrap_socket(s, server_hostname=url.hostname) try: s.connect(address[4]) break except OSError as e: err = e else: # If we couldn't connect to *any* of the addresses, just # bubble up the exception from the last attempt and deny # knowledge of earlier failures. raise err _debug("Established {} connection.".format(s.version())) _debug("Cipher is: {}.".format(s.cipher())) # Send request and wrap response in a file descriptor _debug("Sending %s<CRLF>" % url.geturl()) s.sendall((url.geturl() + "\r\n").encode("UTF-8")) return address, s.makefile(mode="rb") def _get_addresses(host, port): # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled if ":" in host: # This is likely a literal IPv6 address, so we can *only* ask for # IPv6 addresses or getaddrinfo will complain family_mask = socket.AF_INET6 elif socket.has_ipv6: # Accept either IPv4 or IPv6 addresses family_mask = 0 else: # IPv4 only family_mask = socket.AF_INET addresses = socket.getaddrinfo( host, port, family=family_mask, type=socket.SOCK_STREAM ) # Sort addresses so IPv6 ones come first addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) return addresses def _parse_url(url): """Work around issues with Python's urrlib.parse""" pass def _debug(message): pass