💾 Archived View for mozz.us › jetforce › jetforce › protocol.py captured on 2022-07-16 at 22:42:29.

View Raw

More Information

⬅️ Previous capture (2020-09-24)

➡️ Next capture (2023-03-20)

🚧 View Differences

-=-=-=-=-=-=-

from __future__ import annotations

import time
import traceback
import typing
import urllib.parse

from twisted.internet.address import IPv4Address, IPv6Address
from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
from twisted.internet.protocol import connectionDone
from twisted.internet.task import deferLater
from twisted.protocols.basic import LineOnlyReceiver
from twisted.python.failure import Failure

from .__version__ import __version__
from .app.base import ApplicationCallable, EnvironDict, Status
from .tls import inspect_certificate

if typing.TYPE_CHECKING:
    from .server import GeminiServer


class GeminiProtocol(LineOnlyReceiver):
    """
    Handle a single Gemini Protocol TCP request.

    The request handler manages the life of a single gemini request. It exposes
    a simplified interface to read the request URL and write the gemini response
    status line and body to the socket. The request URL and other server
    information is stuffed into an ``environ`` dictionary that encapsulates the
    request at a low level. This dictionary, along with a callback to write the
    response data, and passed to a configurable "application" function or class.

    This design borrows heavily from the standard library's HTTP request
    handler (http.server.BaseHTTPRequestHandler). However, I did not make any
    attempts to directly emulate the existing conventions, because Gemini is an
    inherently simpler protocol than HTTP and much of the boilerplate could be
    removed.
    """

    TIMESTAMP_FORMAT = "%d/%b/%Y:%H:%M:%S %z"
    DEBUG = False

    client_addr: typing.Union[IPv4Address, IPv6Address]
    connected_timestamp: time.struct_time
    request: bytes
    url: str
    status: int
    meta: str
    response_buffer: str
    response_size: int

    def __init__(self, server: GeminiServer, app: ApplicationCallable):
        self.server = server
        self.app = app
        self._currently_deferred: typing.Optional[Deferred] = None

    def connectionMade(self) -> None:
        """
        This is invoked by twisted after the connection is first established.
        """
        self.connected_timestamp = time.localtime()
        self.response_size = 0
        self.response_buffer = ""
        self.client_addr = self.transport.getPeer()

    def connectionLost(self, reason: Failure = connectionDone) -> None:
        """
        This is invoked by twisted after the connection has been closed.
        """
        if self._currently_deferred:
            self._currently_deferred.cancel()

    def lineReceived(self, line: bytes) -> Deferred:
        """
        This method is invoked by LineOnlyReceiver for every incoming line.
        """
        self.request = line
        return ensureDeferred(self._handle_request_noblock())

    def lineLengthExceeded(self, line: bytes) -> None:
        """
        Called when the maximum line length has been reached.
        """
        self.finish_connection()

    def finish_connection(self) -> None:
        """
        Send the TLS "close_notify" alert and then immediately close the TCP
        connection without waiting for the client to respond with it's own
        "close_notify" alert.

        > It is acceptable for an application to only send its shutdown alert
        > and then close the underlying connection without waiting for the
        > peer's response. This way resources can be saved, as the process can
        > already terminate or serve another connection. This should only be
        > done when it is known that the other side will not send more data,
        > otherwise there is a risk of a truncation attack.

        References:
            https://github.com/michael-lazar/jetforce/issues/32
            https://www.openssl.org/docs/man1.1.1/man3/SSL_shutdown.html
        """
        # Send the TLS close_notify alert and flush the write buffer. If the
        # client has already closed their end of the stream, this will also
        # close the underlying TCP connection.
        self.transport.loseConnection()

        # Ensure that the underlying connection will always be closed. There is
        # no harm in calling this method twice if it was already invoked as
        # part of the above TLS shutdown.
        self.transport.transport.loseConnection()

    async def _handle_request_noblock(self) -> None:
        """
        Handle the gemini request and write the raw response to the socket.

        This method is implemented using an async coroutine, which has been
        supported by twisted since python 3.5 by wrapping the method in
        ensureDeferred().

        There are two places that we call into the "application" code:

        1. The initial invoking of app(environ, write_callback) which will
           return an iterable.
        2. Every time that we call next() on the iterable to retrieve bytes to
           write to the response body.

        In both of these places, the app can either return the result directly,
        or it can return a "deferred" object, which is twisted's version of an
        asyncio future. The server will await on the result of this deferred,
        which yields control of the event loop for other requests to be handled
        concurrently.
        """
        try:
            self.parse_header()
        except Exception:
            # Malformed request, throw it away and exit immediately
            self.server.log_message(traceback.format_exc())
            self.write_status(Status.BAD_REQUEST, "Malformed request")
            self.flush_status()
            self.finish_connection()
            raise

        try:
            environ = self.build_environ()
            response_generator = self.app(environ, self.write_status)
            if isinstance(response_generator, Deferred):
                response_generator = await self.track_deferred(response_generator)
            else:
                # Yield control of the event loop
                deferred = deferLater(self.server.reactor, 0)
                await self.track_deferred(deferred)

            for data in response_generator:
                if isinstance(data, Deferred):
                    data = await self.track_deferred(data)
                    self.write_body(data)
                else:
                    self.write_body(data)
                    # Yield control of the event loop
                    deferred = deferLater(self.server.reactor, 0)
                    await self.track_deferred(deferred)
        except CancelledError:
            pass
        except Exception:
            self.server.log_message(traceback.format_exc())
            self.write_status(Status.CGI_ERROR, "An unexpected error occurred")
        finally:
            self.flush_status()
            self.log_request()
            self.finish_connection()

    async def track_deferred(self, deferred: Deferred) -> typing.Union[str, bytes]:
        """
        Keep track of the deferred that we're waiting on so we can send an
        error back to it if the connection is abruptly killed.
        """
        self._currently_deferred = deferred
        try:
            return await deferred
        finally:
            self._currently_deferred = None

    def build_environ(self) -> EnvironDict:
        """
        Construct a dictionary that will be passed to the application handler.

        Variable names (mostly) conform to the CGI spec defined in RFC 3875.
        The TLS variable names borrow from the GLV-1.12556 server.
        """
        url_parts = urllib.parse.urlparse(self.url)
        conn = self.transport.getHandle()
        environ = {
            "GEMINI_URL": self.url,
            "HOSTNAME": self.server.hostname,
            "QUERY_STRING": url_parts.query,
            "REMOTE_ADDR": self.client_addr.host,
            "REMOTE_HOST": self.client_addr.host,
            "SERVER_NAME": self.server.hostname,
            "SERVER_PORT": self.server.port,
            "SERVER_PROTOCOL": "GEMINI",
            "SERVER_SOFTWARE": f"jetforce/{__version__}",
            "TLS_CIPHER": conn.get_cipher_name(),
            "TLS_VERSION": conn.get_protocol_version_name(),
            "client_certificate": None,
        }

        cert = self.transport.getPeerCertificate()
        if cert:
            x509_cert = cert.to_cryptography()
            cert_data = inspect_certificate(x509_cert)
            environ.update(
                {
                    "client_certificate": x509_cert,
                    "AUTH_TYPE": "CERTIFICATE",
                    "REMOTE_USER": cert_data["common_name"],
                    "TLS_CLIENT_HASH": cert_data["fingerprint"],
                    "TLS_CLIENT_HASH_B64": cert_data["fingerprint_b64"],
                    "TLS_CLIENT_NOT_BEFORE": cert_data["not_before"],
                    "TLS_CLIENT_NOT_AFTER": cert_data["not_after"],
                    "TLS_CLIENT_SERIAL_NUMBER": cert_data["serial_number"],
                    # Grab the value that was stashed during the TLS handshake
                    "TLS_CLIENT_AUTHORISED": int(getattr(conn, "authorised", 0)),
                }
            )
        return environ

    def parse_header(self) -> None:
        """
        Parse the gemini header line.

        The request is a single UTF-8 line formatted as: <URL>\r\n
        """
        if len(self.request) > 1024:
            raise ValueError("URL exceeds max length of 1024 bytes")

        self.url = self.request.decode()

    def write_status(self, status: int, meta: str) -> None:
        """
        Write the gemini status line to an internal buffer.

        The status line is a single UTF-8 line formatted as:
            <STATUS><SPACE><META><CR><LF>

        If the response status is 2, the meta field will contain the mimetype
        of the response data sent. If the status is something else, the meta
        will contain a descriptive message.

        The status is not written immediately, it's added to an internal buffer
        that must be flushed. This is done so that the status can be updated as
        long as no other data has been written to the stream yet.
        """
        self.status = status
        self.meta = meta
        self.response_buffer = f"{status} {meta}\r\n"

    def write_body(self, data: typing.Union[str, bytes, None]) -> None:
        """
        Write bytes to the gemini response body.
        """
        if data is None:
            return

        if isinstance(data, str):
            data = data.encode()

        self.flush_status()
        self.response_size += len(data)
        if self.DEBUG:
            print(f"Writing body: {len(data)} bytes")
        self.transport.write(data)

    def flush_status(self) -> None:
        """
        Flush the status line from the internal buffer to the socket stream.
        """
        if self.response_buffer and not self.response_size:
            data = self.response_buffer.encode()
            self.response_size += len(data)
            if self.DEBUG:
                print(f"Writing status: {len(data)} bytes")
            self.transport.write(data)
        self.response_buffer = ""

    def log_request(self) -> None:
        """
        Log a gemini request using a format derived from the Common Log Format.
        """
        try:
            message = '{} [{}] "{}" {} {} {}'.format(
                self.client_addr.host,
                time.strftime(self.TIMESTAMP_FORMAT, self.connected_timestamp),
                self.url,
                self.status,
                self.meta,
                self.response_size,
            )
        except AttributeError:
            # The connection ended before we got far enough to log anything
            pass
        else:
            self.server.log_access(message)