going-flying.com gemini git repository
29afb90586371896e0c2e8812dfde3e292cc240f - Matthew Ernisse - 1612278406
gusmobile from https://git.carcosa.net/jmcbray/gusmobile/
diff --git a/capcom/gusmobile/__init__.py b/capcom/gusmobile/__init__.py new file mode 100644 index 0000000..0572537 --- /dev/null +++ b/capcom/gusmobile/__init__.py @@ -0,0 +1,2 @@ +from .client import Response +from .client import fetch diff --git a/capcom/gusmobile/client.py b/capcom/gusmobile/client.py new file mode 100644 index 0000000..b80db41 --- /dev/null +++ b/capcom/gusmobile/client.py @@ -0,0 +1,208 @@ +# client.py +# +# Copyright 2019 Jason McBrayer +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. + +import cgi +import codecs +import collections +import fnmatch +import io +import mimetypes +import os.path +import random +import shlex +import shutil +import socket +import subprocess +import tempfile +import urllib.parse +import ssl +import sys +import time + +GEMINI_TIMEOUT = 15 + + +class Response: + content = None + content_type = None + url = None + status = None + status_meta = None + + def __init__( + self, content=None, content_type=None, url=None, status=None, status_meta=None + ): + self.content = content + self.content_type = content_type + self.url = url + self.status = status + self.status_meta = status_meta + + +def fetch(url): + # Do everything which touches the network in one block, + # so we only need to catch exceptions once + url = urllib.parse.urlparse(url) + header = "" + try: + # Is this a local file? + if not url.netloc: + address, f = None, open(url.path, "rb") + else: + address, f = _send_request(url) + # Read response header + header = f.readline() + header = header.decode("UTF-8").strip() + _debug("Response header: %s." % header) + + # Catch network errors which may happen on initial connection + except Exception as err: + # Print an error message + if isinstance(err, socket.gaierror): + print("ERROR: DNS error!") + elif isinstance(err, ConnectionRefusedError): + print("ERROR: Connection refused!") + elif isinstance(err, ConnectionResetError): + print("ERROR: Connection reset!") + elif isinstance(err, (TimeoutError, socket.timeout)): + print( + """ERROR: Connection timed out! + Slow internet connection? Use 'set timeout' to be more patient.""" + ) + else: + print("ERROR: " + str(err)) + return + # Validate header + status, meta = header.split(maxsplit=1) + if len(header) > 1024 or len(status) != 2 or not status.isnumeric(): + print("ERROR: Received invalid header from server!") + f.close() + return + + # Handle headers. Not all headers are handled yet. + if status.startswith("1"): + raise NotImplementedError() + # Redirects + elif status.startswith("3"): + raise NotImplementedError() + # Errors + elif status.startswith("4") or status.startswith("5"): + raise NotImplementedError() + # Client cert + elif status.startswith("6"): + raise NotImplementedError() + # Invalid status + elif not status.startswith("2"): + print("ERROR: Server returned undefined status code %s!" % status) + return + + # Handle success + assert status.startswith("2") + mime = meta + if mime == "": + mime = "text/gemini; charset=utf-8" + mime, mime_options = cgi.parse_header(mime) + charset = "utf-8" + if "charset" in mime_options: + try: + codecs.lookup(mime_options["charset"]) + charset = mime_options["charset"] + except LookupError: + print("Header declared unknown encoding %s" % value) + return + # Read the response body over the network + body = f.read() + return Response( + content=codecs.decode(body, charset), + content_type=mime, + url=url.geturl(), + status=status, + ) + + +def _send_request(url): + """Send a selector to a given host and port. + Returns the resolved address and binary file with the reply.""" + addresses = _get_addresses(url.hostname, url.port) + # Connect to remote host by any address possible + err = None + for address in addresses: + _debug("Connecting to: " + str(address[4])) + s = socket.socket(address[0], address[1]) + s.settimeout(GEMINI_TIMEOUT) + context = ssl.SSLContext() + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + # Impose minimum TLS version + if sys.version_info.minor == 7: + context.minimum_version = ssl.TLSVersion.TLSv1_2 + else: + context.options | ssl.OP_NO_TLSv1_1 + context.options | ssl.OP_NO_SSLv3 + context.options | ssl.OP_NO_SSLv2 + context.set_ciphers( + "AES+DHE:AES+ECDHE:CHACHA20+DHE:CHACHA20+ECDHE:!SHA1:@STRENGTH" + ) + # print(context.get_ciphers()) + s = context.wrap_socket(s, server_hostname=url.hostname) + try: + s.connect(address[4]) + break + except OSError as e: + err = e + else: + # If we couldn't connect to *any* of the addresses, just + # bubble up the exception from the last attempt and deny + # knowledge of earlier failures. + raise err + + _debug("Established {} connection.".format(s.version())) + _debug("Cipher is: {}.".format(s.cipher())) + + # Send request and wrap response in a file descriptor + _debug("Sending %s<CRLF>" % url.geturl()) + s.sendall((url.geturl() + "\r\n").encode("UTF-8")) + return address, s.makefile(mode="rb") + + +def _get_addresses(host, port): + # DNS lookup - will get IPv4 and IPv6 records if IPv6 is enabled + if ":" in host: + # This is likely a literal IPv6 address, so we can *only* ask for + # IPv6 addresses or getaddrinfo will complain + family_mask = socket.AF_INET6 + elif socket.has_ipv6: + # Accept either IPv4 or IPv6 addresses + family_mask = 0 + else: + # IPv4 only + family_mask = socket.AF_INET + addresses = socket.getaddrinfo( + host, port, family=family_mask, type=socket.SOCK_STREAM + ) + # Sort addresses so IPv6 ones come first + addresses.sort(key=lambda add: add[0] == socket.AF_INET6, reverse=True) + return addresses + + +def _parse_url(url): + """Work around issues with Python's urrlib.parse""" + pass + + +def _debug(message): + pass