💾 Archived View for tris.fyi › pydoc › linecache captured on 2022-07-16 at 14:55:30. Gemini links have been rewritten to link to archived content
⬅️ Previous capture (2022-04-28)
-=-=-=-=-=-=-
Cache lines from Python source files. This is intended to read lines from modules imported -- hence if a filename is not found, it will look down the module search path for a file by that name.
checkcache(filename=None) Discard cache entries that are out of date. (This is not checked upon each call!)
clearcache() Clear the cache entirely.
getline(filename, lineno, module_globals=None) Get a line for a Python source file from the cache. Update the cache if it doesn't contain an entry for this file already.
getlines(filename, module_globals=None) Get the lines for a Python source file from the cache. Update the cache if it doesn't contain an entry for this file already.
lazycache(filename, module_globals) Seed the cache for filename with module_globals. The module loader will be asked for the source only when getlines is called, not immediately. If there is an entry in the cache already, it is not altered. :return: True if a lazy load is registered in the cache, otherwise False. To register such a load a module loader with a get_source method must be found, the filename must be a cacheable filename, and the filename must not be already cached.
updatecache(filename, module_globals=None) Update a cache entry and return its list of lines. If something's wrong, print a message, discard the cache entry, and return an empty list.
cache = {'/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/tls.py': (3130, 1.0, ['import datetime\n', 'import os.path\n', 'import logging\n', 'import ssl\n', 'import traceback\n', '\n', 'from cryptography import x509\n', 'from cryptography.x509.oid import NameOID\n', 'from cryptography.hazmat.primitives import hashes, serialization\n', 'from cryptography.hazmat.primitives.asymmetric import rsa\n', '\n', 'from typing import List, TYPE_CHECKING\n', '\n', 'if TYPE_CHECKING:\n', ' from .config import Config\n', '\n', '\n', 'log = logging.getLogger("amethyst.tls")\n', '\n', '\n', 'def make_partial_context():\n', ' c = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)\n', ' c.options |= ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1\n', ' c.options |= ssl.OP_SINGLE_DH_USE | ssl.OP_SINGLE_ECDH_USE\n', ' c.check_hostname = False\n', ' c.verify_mode = ssl.VerifyMode.CERT_OPTIONAL\n', ' return c\n', '\n', '\n', 'def make_context(cert_path: str, key_path: str):\n', ' c = make_partial_context()\n', ' c.load_cert_chain(cert_path, keyfile=key_path)\n', ' return c\n', '\n', '\n', 'def make_sni_context(config: "Config"):\n', ' def sni_callback(sock, host, _original_ctx):\n', ' for host_cfg in config.hosts:\n', ' if host_cfg.host == host:\n', ' break\n', ' else:\n', ' return ssl.ALERT_DESCRIPTION_HANDSHAKE_FAILURE\n', '\n', ' try:\n', ' sock.context = host_cfg.tls.get_ssl_context()\n', ' except Exception:\n', ' log.warn(f"When setting context after SNI; {traceback.format_exc()}")\n', '\n', ' c = make_partial_context()\n', ' c.sni_callback = sni_callback\n', ' return c\n', '\n', '\n', 'def update_certificate(cert_path: str, key_path: str, hosts: List[str]):\n', ' if os.path.exists(cert_path):\n', ' with open(cert_path, "rb") as f:\n', ' cert = x509.load_pem_x509_certificate(f.read())\n', '\n', ' if cert.not_valid_after > datetime.datetime.now():\n', ' log.info("Certificate exists and is unexpired; skipping regeneration.")\n', ' return cert.not_valid_after\n', '\n', ' else:\n', ' log.info("Certificate expired; regenerating.")\n', '\n', ' else:\n', ' log.info("Certificate does not exist yet, generating one now.")\n', '\n', ' key = rsa.generate_private_key(\n', ' public_exponent=65537,\n', ' key_size=4096,\n', ' )\n', '\n', ' with open(key_path, "wb") as f:\n', ' f.write(\n', ' key.private_bytes(\n', ' encoding=serialization.Encoding.PEM,\n', ' format=serialization.PrivateFormat.TraditionalOpenSSL,\n', ' encryption_algorithm=serialization.NoEncryption(),\n', ' )\n', ' )\n', '\n', ' subject = issuer = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, hosts[0])])\n', '\n', ' cert = (\n', ' x509.CertificateBuilder()\n', ' .subject_name(subject)\n', ' .issuer_name(issuer)\n', ' .public_key(key.public_key())\n', ' .serial_number(x509.random_serial_number())\n', ' .not_valid_before(datetime.datetime.utcnow() - datetime.timedelta(days=1))\n', ' .not_valid_after(datetime.datetime.utcnow() + datetime.timedelta(days=30))\n', ' .add_extension(\n', ' x509.SubjectAlternativeName([x509.DNSName(host) for host in hosts]),\n', ' critical=False,\n', ' )\n', ' .sign(key, hashes.SHA256())\n', ' )\n', '\n', ' with open(cert_path, "wb") as f:\n', ' f.write(cert.public_bytes(serialization.Encoding.PEM))\n', '\n', ' log.info("Success! Certificate generated and saved.")\n', ' return cert.not_valid_after\n'], '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/tls.py'), '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/config.py': (2800, 1.0, ['import datetime\n', 'import ssl\n', '\n', 'from dataclasses import dataclass\n', 'from typing import Dict, List, Optional, Tuple\n', '\n', 'from .handler import GenericHandler, Handler\n', 'from .resource import Resource\n', 'from .resource_registry import registry\n', '\n', 'import os\n', '\n', '\n', '@dataclass\n', 'class TLSConfig:\n', ' host: str\n', ' auto: bool = False\n', ' cert_path: Optional[str] = None\n', ' key_path: Optional[str] = None\n', '\n', ' _context_cache: Optional[Tuple[datetime.datetime, ssl.SSLContext]] = None\n', '\n', ' @classmethod\n', ' def from_config(cls, host, cfg):\n', ' o = cls(host)\n', '\n', ' state = os.getenv("STATE_DIRECTORY", ".")\n', '\n', ' o.auto = cfg.get("auto", True)\n', '\n', ' o.cert_path = cfg.get("cert_path", None)\n', ' if o.cert_path is None:\n', ' o.cert_path = os.path.join(state, f"{host}.cert.pem")\n', '\n', ' o.key_path = cfg.get("key_path", None)\n', ' if o.key_path is None:\n', ' o.key_path = os.path.join(state, f"{host}.key.pem")\n', '\n', ' return o\n', '\n', ' def clear_context_cache(self):\n', ' self._context_cache = None\n', '\n', ' def get_ssl_context(self):\n', ' from . import tls\n', '\n', ' if self._context_cache is not None:\n', ' expires, context = self._context_cache\n', '\n', ' if expires is None or expires > datetime.datetime.now():\n', ' return context\n', '\n', ' if self.auto:\n', ' expires = tls.update_certificate(self.cert_path, self.key_path, [self.host])\n', '\n', ' else:\n', ' # We want to keep using a manually-specified certificate forever\n', ' # or at least until the server is restarted / HUPed.\n', ' expires = None\n', '\n', ' context = tls.make_context(self.cert_path, self.key_path)\n', '\n', ' self._context_cache = expires, context\n', ' return context\n', '\n', '\n', '@dataclass\n', 'class HostConfig:\n', ' host: str\n', ' tls: TLSConfig\n', ' path_map: Dict[str, Resource]\n', '\n', ' @classmethod\n', ' def _construct_resource(cls, cfg) -> Resource:\n', ' resource_type = cfg.pop("type", "filesystem")\n', ' return registry[resource_type](**cfg)\n', '\n', ' @classmethod\n', ' def from_config(cls, cfg):\n', ' host = cfg["name"]\n', ' tls = TLSConfig.from_config(host, cfg.get("tls", {}))\n', ' path_map = {\n', ' path: cls._construct_resource(config)\n', ' for path, config in cfg["paths"].items()\n', ' }\n', '\n', ' return cls(host, tls, path_map)\n', '\n', '\n', '@dataclass\n', 'class Config:\n', ' hosts: List[HostConfig]\n', ' handler: Handler\n', ' port: int = 1965\n', '\n', ' def load(self, cfg):\n', ' self.hosts = [HostConfig.from_config(host) for host in cfg.get("hosts", [])]\n', '\n', ' if not self.hosts:\n', ' raise ValueError("Server can\'t run without any hosts!")\n', '\n', ' self.handler = GenericHandler({host.host: host.path_map for host in self.hosts})\n', '\n', ' @classmethod\n', ' def from_config(cls, cfg):\n', ' o = cls([], None, cfg.get("port", 1965))\n', ' o.load(cfg)\n', ' return o\n'], '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/config.py'), '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/server.py': (2435, 1.0, ['#!/usr/bin/env python3\n', '\n', 'import asyncio\n', 'import logging\n', 'import signal\n', 'import traceback\n', 'from typing import TYPE_CHECKING\n', '\n', 'from .response import Response, Status\n', 'from .tls import make_sni_context\n', '\n', 'if TYPE_CHECKING:\n', ' from .config import Config\n', '\n', '\n', 'class Server:\n', ' def __init__(\n', ' self,\n', ' config: "Config",\n', ' ):\n', ' self.log = logging.getLogger("amethyst.server")\n', ' self.access_log = logging.getLogger("amethyst.access")\n', '\n', ' self.server = None\n', ' self.config = config\n', '\n', ' self.ssl_context = make_sni_context(config)\n', ' self.server = self.get_server()\n', '\n', ' def get_server(self):\n', ' loop = asyncio.get_event_loop()\n', '\n', ' return asyncio.start_server(\n', ' self.handle_connection,\n', ' port=self.config.port,\n', ' ssl=self.ssl_context,\n', ' loop=loop,\n', ' )\n', '\n', ' async def handle_connection(self, reader, writer):\n', ' from .request import Connection\n', '\n', ' peer_addr = writer.get_extra_info("peername")\n', ' peer_cert = writer.get_extra_info("peercert")\n', '\n', ' self.log.debug(f"Received connection from {peer_addr}")\n', '\n', ' url = "-"\n', ' try:\n', ' url = (await reader.readuntil(b"\\r\\n")).rstrip(b"\\r\\n").decode()\n', '\n', ' if len(url) > 1024:\n', ' response = Response(Status.BAD_REQUEST, "URL too long!")\n', ' else:\n', ' response = await self.config.handler(\n', ' url, Connection(self, peer_addr, peer_cert)\n', ' )\n', '\n', ' except UnicodeDecodeError:\n', ' response = Response(Status.BAD_REQUEST, "URL must be UTF-8")\n', '\n', ' except Exception:\n', ' self.log.error(f"While generating response; {traceback.format_exc()}")\n', '\n', ' response = Response(\n', ' Status.TEMPORARY_FAILURE,\n', ' "Exception thrown during request processing; see server logs for details.",\n', ' )\n', '\n', ' self.access_log.info(\n', ' f"{url} {response.status_code.value}[{response.status_code.name}]"\n', ' f" {response.meta}"\n', ' )\n', '\n', ' try:\n', ' line = f"{response.status_code.value} {response.meta}\\r\\n".encode()\n', ' writer.write(line)\n', '\n', ' if response.status_code.is_success() and response.content is not None:\n', ' writer.write(response.content)\n', '\n', ' except Exception:\n', ' self.log.error(f"While writing response; {traceback.format_exc()}")\n', '\n', ' finally:\n', ' writer.close()\n'], '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/server.py'), '/nix/store/65h1mb8604dbw077762j702q6b8i0mpw-python3-3.9.13/lib/python3.9/asyncio/streams.py': (26656, 1.0, ['__all__ = (\n', " 'StreamReader', 'StreamWriter', 'StreamReaderProtocol',\n", " 'open_connection', 'start_server')\n", '\n', 'import socket\n', 'import sys\n', 'import warnings\n', 'import weakref\n', '\n', "if hasattr(socket, 'AF_UNIX'):\n", " __all__ += ('open_unix_connection', 'start_unix_server')\n", '\n', 'from . import coroutines\n', 'from . import events\n', 'from . import exceptions\n', 'from . import format_helpers\n', 'from . import protocols\n', 'from .log import logger\n', 'from .tasks import sleep\n', '\n', '\n', '_DEFAULT_LIMIT = 2 ** 16 # 64 KiB\n', '\n', '\n', 'async def open_connection(host=None, port=None, *,\n', ' loop=None, limit=_DEFAULT_LIMIT, **kwds):\n', ' """A wrapper for create_connection() returning a (reader, writer) pair.\n', '\n', ' The reader returned is a StreamReader instance; the writer is a\n', ' StreamWriter instance.\n', '\n', ' The arguments are all the usual arguments to create_connection()\n', ' except protocol_factory; most common are positional host and port,\n', ' with various optional keyword arguments following.\n', '\n', ' Additional optional keyword arguments are loop (to set the event loop\n', ' instance to use) and limit (to set the buffer limit passed to the\n', ' StreamReader).\n', '\n', ' (If you want to customize the StreamReader and/or\n', " StreamReaderProtocol classes, just copy the code -- there's\n", ' really nothing special here except some convenience.)\n', ' """\n', ' if loop is None:\n', ' loop = events.get_event_loop()\n', ' else:\n', ' warnings.warn("The loop argument is deprecated since Python 3.8, "\n', ' "and scheduled for removal in Python 3.10.",\n', ' DeprecationWarning, stacklevel=2)\n', ' reader = StreamReader(limit=limit, loop=loop)\n', ' protocol = StreamReaderProtocol(reader, loop=loop)\n', ' transport, _ = await loop.create_connection(\n', ' lambda: protocol, host, port, **kwds)\n', ' writer = StreamWriter(transport, protocol, reader, loop)\n', ' return reader, writer\n', '\n', '\n', 'async def start_server(client_connected_cb, host=None, port=None, *,\n', ' loop=None, limit=_DEFAULT_LIMIT, **kwds):\n', ' """Start a socket server, call back for each client connected.\n', '\n', ' The first parameter, `client_connected_cb`, takes two parameters:\n', ' client_reader, client_writer. client_reader is a StreamReader\n', ' object, while client_writer is a StreamWriter object. This\n', ' parameter can either be a plain callback function or a coroutine;\n', ' if it is a coroutine, it will be automatically converted into a\n', ' Task.\n', '\n', ' The rest of the arguments are all the usual arguments to\n', ' loop.create_server() except protocol_factory; most common are\n', ' positional host and port, with various optional keyword arguments\n', ' following. The return value is the same as loop.create_server().\n', '\n', ' Additional optional keyword arguments are loop (to set the event loop\n', ' instance to use) and limit (to set the buffer limit passed to the\n', ' StreamReader).\n', '\n', ' The return value is the same as loop.create_server(), i.e. a\n', ' Server object which can be used to stop the service.\n', ' """\n', ' if loop is None:\n', ' loop = events.get_event_loop()\n', ' else:\n', ' warnings.warn("The loop argument is deprecated since Python 3.8, "\n', ' "and scheduled for removal in Python 3.10.",\n', ' DeprecationWarning, stacklevel=2)\n', '\n', ' def factory():\n', ' reader = StreamReader(limit=limit, loop=loop)\n', ' protocol = StreamReaderProtocol(reader, client_connected_cb,\n', ' loop=loop)\n', ' return protocol\n', '\n', ' return await loop.create_server(factory, host, port, **kwds)\n', '\n', '\n', "if hasattr(socket, 'AF_UNIX'):\n", ' # UNIX Domain Sockets are supported on this platform\n', '\n', ' async def open_unix_connection(path=None, *,\n', ' loop=None, limit=_DEFAULT_LIMIT, **kwds):\n', ' """Similar to `open_connection` but works with UNIX Domain Sockets."""\n', ' if loop is None:\n', ' loop = events.get_event_loop()\n', ' else:\n', ' warnings.warn("The loop argument is deprecated since Python 3.8, "\n', ' "and scheduled for removal in Python 3.10.",\n', ' DeprecationWarning, stacklevel=2)\n', ' reader = StreamReader(limit=limit, loop=loop)\n', ' protocol = StreamReaderProtocol(reader, loop=loop)\n', ' transport, _ = await loop.create_unix_connection(\n', ' lambda: protocol, path, **kwds)\n', ' writer = StreamWriter(transport, protocol, reader, loop)\n', ' return reader, writer\n', '\n', ' async def start_unix_server(client_connected_cb, path=None, *,\n', ' loop=None, limit=_DEFAULT_LIMIT, **kwds):\n', ' """Similar to `start_server` but works with UNIX Domain Sockets."""\n', ' if loop is None:\n', ' loop = events.get_event_loop()\n', ' else:\n', ' warnings.warn("The loop argument is deprecated since Python 3.8, "\n', ' "and scheduled for removal in Python 3.10.",\n', ' DeprecationWarning, stacklevel=2)\n', '\n', ' def factory():\n', ' reader = StreamReader(limit=limit, loop=loop)\n', ' protocol = StreamReaderProtocol(reader, client_connected_cb,\n', ' loop=loop)\n', ' return protocol\n', '\n', ' return await loop.create_unix_server(factory, path, **kwds)\n', '\n', '\n', 'class FlowControlMixin(protocols.Protocol):\n', ' """Reusable flow control logic for StreamWriter.drain().\n', '\n', ' This implements the protocol methods pause_writing(),\n', ' resume_writing() and connection_lost(). If the subclass overrides\n', ' these it must call the super methods.\n', '\n', ' StreamWriter.drain() must wait for _drain_helper() coroutine.\n', ' """\n', '\n', ' def __init__(self, loop=None):\n', ' if loop is None:\n', ' self._loop = events.get_event_loop()\n', ' else:\n', ' self._loop = loop\n', ' self._paused = False\n', ' self._drain_waiter = None\n', ' self._connection_lost = False\n', '\n', ' def pause_writing(self):\n', ' assert not self._paused\n', ' self._paused = True\n', ' if self._loop.get_debug():\n', ' logger.debug("%r pauses writing", self)\n', '\n', ' def resume_writing(self):\n', ' assert self._paused\n', ' self._paused = False\n', ' if self._loop.get_debug():\n', ' logger.debug("%r resumes writing", self)\n', '\n', ' waiter = self._drain_waiter\n', ' if waiter is not None:\n', ' self._drain_waiter = None\n', ' if not waiter.done():\n', ' waiter.set_result(None)\n', '\n', ' def connection_lost(self, exc):\n', ' self._connection_lost = True\n', ' # Wake up the writer if currently paused.\n', ' if not self._paused:\n', ' return\n', ' waiter = self._drain_waiter\n', ' if waiter is None:\n', ' return\n', ' self._drain_waiter = None\n', ' if waiter.done():\n', ' return\n', ' if exc is None:\n', ' waiter.set_result(None)\n', ' else:\n', ' waiter.set_exception(exc)\n', '\n', ' async def _drain_helper(self):\n', ' if self._connection_lost:\n', " raise ConnectionResetError('Connection lost')\n", ' if not self._paused:\n', ' return\n', ' waiter = self._drain_waiter\n', ' assert waiter is None or waiter.cancelled()\n', ' waiter = self._loop.create_future()\n', ' self._drain_waiter = waiter\n', ' await waiter\n', '\n', ' def _get_close_waiter(self, stream):\n', ' raise NotImplementedError\n', '\n', '\n', 'class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):\n', ' """Helper class to adapt between Protocol and StreamReader.\n', '\n', ' (This is a helper class instead of making StreamReader itself a\n', ' Protocol subclass, because the StreamReader has other potential\n', ' uses, and to prevent the user of the StreamReader to accidentally\n', ' call inappropriate methods of the protocol.)\n', ' """\n', '\n', ' _source_traceback = None\n', '\n', ' def __init__(self, stream_reader, client_connected_cb=None, loop=None):\n', ' super().__init__(loop=loop)\n', ' if stream_reader is not None:\n', ' self._stream_reader_wr = weakref.ref(stream_reader)\n', ' self._source_traceback = stream_reader._source_traceback\n', ' else:\n', ' self._stream_reader_wr = None\n', ' if client_connected_cb is not None:\n', ' # This is a stream created by the `create_server()` function.\n', ' # Keep a strong reference to the reader until a connection\n', ' # is established.\n', ' self._strong_reader = stream_reader\n', ' self._reject_connection = False\n', ' self._stream_writer = None\n', ' self._transport = None\n', ' self._client_connected_cb = client_connected_cb\n', ' self._over_ssl = False\n', ' self._closed = self._loop.create_future()\n', '\n', ' @property\n', ' def _stream_reader(self):\n', ' if self._stream_reader_wr is None:\n', ' return None\n', ' return self._stream_reader_wr()\n', '\n', ' def connection_made(self, transport):\n', ' if self._reject_connection:\n', ' context = {\n', " 'message': ('An open stream was garbage collected prior to '\n", " 'establishing network connection; '\n", ' \'call "stream.close()" explicitly.\')\n', ' }\n', ' if self._source_traceback:\n', " context['source_traceback'] = self._source_traceback\n", ' self._loop.call_exception_handler(context)\n', ' transport.abort()\n', ' return\n', ' self._transport = transport\n', ' reader = self._stream_reader\n', ' if reader is not None:\n', ' reader.set_transport(transport)\n', " self._over_ssl = transport.get_extra_info('sslcontext') is not None\n", ' if self._client_connected_cb is not None:\n', ' self._stream_writer = StreamWriter(transport, self,\n', ' reader,\n', ' self._loop)\n', ' res = self._client_connected_cb(reader,\n', ' self._stream_writer)\n', ' if coroutines.iscoroutine(res):\n', ' self._loop.create_task(res)\n', ' self._strong_reader = None\n', '\n', ' def connection_lost(self, exc):\n', ' reader = self._stream_reader\n', ' if reader is not None:\n', ' if exc is None:\n', ' reader.feed_eof()\n', ' else:\n', ' reader.set_exception(exc)\n', ' if not self._closed.done():\n', ' if exc is None:\n', ' self._closed.set_result(None)\n', ' else:\n', ' self._closed.set_exception(exc)\n', ' super().connection_lost(exc)\n', ' self._stream_reader_wr = None\n', ' self._stream_writer = None\n', ' self._transport = None\n', '\n', ' def data_received(self, data):\n', ' reader = self._stream_reader\n', ' if reader is not None:\n', ' reader.feed_data(data)\n', '\n', ' def eof_received(self):\n', ' reader = self._stream_reader\n', ' if reader is not None:\n', ' reader.feed_eof()\n', ' if self._over_ssl:\n', ' # Prevent a warning in SSLProtocol.eof_received:\n', ' # "returning true from eof_received()\n', ' # has no effect when using ssl"\n', ' return False\n', ' return True\n', '\n', ' def _get_close_waiter(self, stream):\n', ' return self._closed\n', '\n', ' def __del__(self):\n', ' # Prevent reports about unhandled exceptions.\n', ' # Better than self._closed._log_traceback = False hack\n', ' closed = self._closed\n', ' if closed.done() and not closed.cancelled():\n', ' closed.exception()\n', '\n', '\n', 'class StreamWriter:\n', ' """Wraps a Transport.\n', '\n', ' This exposes write(), writelines(), [can_]write_eof(),\n', ' get_extra_info() and close(). It adds drain() which returns an\n', ' optional Future on which you can wait for flow control. It also\n', ' adds a transport property which references the Transport\n', ' directly.\n', ' """\n', '\n', ' def __init__(self, transport, protocol, reader, loop):\n', ' self._transport = transport\n', ' self._protocol = protocol\n', ' # drain() expects that the reader has an exception() method\n', ' assert reader is None or isinstance(reader, StreamReader)\n', ' self._reader = reader\n', ' self._loop = loop\n', ' self._complete_fut = self._loop.create_future()\n', ' self._complete_fut.set_result(None)\n', '\n', ' def __repr__(self):\n', " info = [self.__class__.__name__, f'transport={self._transport!r}']\n", ' if self._reader is not None:\n', " info.append(f'reader={self._reader!r}')\n", " return '<{}>'.format(' '.join(info))\n", '\n', ' @property\n', ' def transport(self):\n', ' return self._transport\n', '\n', ' def write(self, data):\n', ' self._transport.write(data)\n', '\n', ' def writelines(self, data):\n', ' self._transport.writelines(data)\n', '\n', ' def write_eof(self):\n', ' return self._transport.write_eof()\n', '\n', ' def can_write_eof(self):\n', ' return self._transport.can_write_eof()\n', '\n', ' def close(self):\n', ' return self._transport.close()\n', '\n', ' def is_closing(self):\n', ' return self._transport.is_closing()\n', '\n', ' async def wait_closed(self):\n', ' await self._protocol._get_close_waiter(self)\n', '\n', ' def get_extra_info(self, name, default=None):\n', ' return self._transport.get_extra_info(name, default)\n', '\n', ' async def drain(self):\n', ' """Flush the write buffer.\n', '\n', ' The intended use is to write\n', '\n', ' w.write(data)\n', ' await w.drain()\n', ' """\n', ' if self._reader is not None:\n', ' exc = self._reader.exception()\n', ' if exc is not None:\n', ' raise exc\n', ' if self._transport.is_closing():\n', ' # Wait for protocol.connection_lost() call\n', ' # Raise connection closing error if any,\n', ' # ConnectionResetError otherwise\n', ' # Yield to the event loop so connection_lost() may be\n', ' # called. Without this, _drain_helper() would return\n', ' # immediately, and code that calls\n', ' # write(...); await drain()\n', ' # in a loop would never call connection_lost(), so it\n', ' # would not see an error when the socket is closed.\n', ' await sleep(0)\n', ' await self._protocol._drain_helper()\n', '\n', '\n', 'class StreamReader:\n', '\n', ' _source_traceback = None\n', '\n', ' def __init__(self, limit=_DEFAULT_LIMIT, loop=None):\n', ' # The line length limit is a security feature;\n', ' # it also doubles as half the buffer limit.\n', '\n', ' if limit <= 0:\n', " raise ValueError('Limit cannot be <= 0')\n", '\n', ' self._limit = limit\n', ' if loop is None:\n', ' self._loop = events.get_event_loop()\n', ' else:\n', ' self._loop = loop\n', ' self._buffer = bytearray()\n', " self._eof = False # Whether we're done.\n", ' self._waiter = None # A future used by _wait_for_data()\n', ' self._exception = None\n', ' self._transport = None\n', ' self._paused = False\n', ' if self._loop.get_debug():\n', ' self._source_traceback = format_helpers.extract_stack(\n', ' sys._getframe(1))\n', '\n', ' def __repr__(self):\n', " info = ['StreamReader']\n", ' if self._buffer:\n', " info.append(f'{len(self._buffer)} bytes')\n", ' if self._eof:\n', " info.append('eof')\n", ' if self._limit != _DEFAULT_LIMIT:\n', " info.append(f'limit={self._limit}')\n", ' if self._waiter:\n', " info.append(f'waiter={self._waiter!r}')\n", ' if self._exception:\n', " info.append(f'exception={self._exception!r}')\n", ' if self._transport:\n', " info.append(f'transport={self._transport!r}')\n", ' if self._paused:\n', " info.append('paused')\n", " return '<{}>'.format(' '.join(info))\n", '\n', ' def exception(self):\n', ' return self._exception\n', '\n', ' def set_exception(self, exc):\n', ' self._exception = exc\n', '\n', ' waiter = self._waiter\n', ' if waiter is not None:\n', ' self._waiter = None\n', ' if not waiter.cancelled():\n', ' waiter.set_exception(exc)\n', '\n', ' def _wakeup_waiter(self):\n', ' """Wakeup read*() functions waiting for data or EOF."""\n', ' waiter = self._waiter\n', ' if waiter is not None:\n', ' self._waiter = None\n', ' if not waiter.cancelled():\n', ' waiter.set_result(None)\n', '\n', ' def set_transport(self, transport):\n', " assert self._transport is None, 'Transport already set'\n", ' self._transport = transport\n', '\n', ' def _maybe_resume_transport(self):\n', ' if self._paused and len(self._buffer) <= self._limit:\n', ' self._paused = False\n', ' self._transport.resume_reading()\n', '\n', ' def feed_eof(self):\n', ' self._eof = True\n', ' self._wakeup_waiter()\n', '\n', ' def at_eof(self):\n', ' """Return True if the buffer is empty and \'feed_eof\' was called."""\n', ' return self._eof and not self._buffer\n', '\n', ' def feed_data(self, data):\n', " assert not self._eof, 'feed_data after feed_eof'\n", '\n', ' if not data:\n', ' return\n', '\n', ' self._buffer.extend(data)\n', ' self._wakeup_waiter()\n', '\n', ' if (self._transport is not None and\n', ' not self._paused and\n', ' len(self._buffer) > 2 * self._limit):\n', ' try:\n', ' self._transport.pause_reading()\n', ' except NotImplementedError:\n', " # The transport can't be paused.\n", " # We'll just have to buffer all data.\n", " # Forget the transport so we don't keep trying.\n", ' self._transport = None\n', ' else:\n', ' self._paused = True\n', '\n', ' async def _wait_for_data(self, func_name):\n', ' """Wait until feed_data() or feed_eof() is called.\n', '\n', ' If stream was paused, automatically resume it.\n', ' """\n', ' # StreamReader uses a future to link the protocol feed_data() method\n', ' # to a read coroutine. Running two read coroutines at the same time\n', ' # would have an unexpected behaviour. It would not possible to know\n', ' # which coroutine would get the next data.\n', ' if self._waiter is not None:\n', ' raise RuntimeError(\n', " f'{func_name}() called while another coroutine is '\n", " f'already waiting for incoming data')\n", '\n', " assert not self._eof, '_wait_for_data after EOF'\n", '\n', ' # Waiting for data while paused will make deadlock, so prevent it.\n', ' # This is essential for readexactly(n) for case when n > self._limit.\n', ' if self._paused:\n', ' self._paused = False\n', ' self._transport.resume_reading()\n', '\n', ' self._waiter = self._loop.create_future()\n', ' try:\n', ' await self._waiter\n', ' finally:\n', ' self._waiter = None\n', '\n', ' async def readline(self):\n', ' """Read chunk of data from the stream until newline (b\'\\n\') is found.\n', '\n', ' On success, return chunk that ends with newline. If only partial\n', ' line can be read due to EOF, return incomplete line without\n', ' terminating newline. When EOF was reached while no bytes read, empty\n', ' bytes object is returned.\n', '\n', ' If limit is reached, ValueError will be raised. In that case, if\n', ' newline was found, complete line including newline will be removed\n', ' from internal buffer. Else, internal buffer will be cleared. Limit is\n', ' compared against part of the line without newline.\n', '\n', ' If stream was paused, this function will automatically resume it if\n', ' needed.\n', ' """\n', " sep = b'\\n'\n", ' seplen = len(sep)\n', ' try:\n', ' line = await self.readuntil(sep)\n', ' except exceptions.IncompleteReadError as e:\n', ' return e.partial\n', ' except exceptions.LimitOverrunError as e:\n', ' if self._buffer.startswith(sep, e.consumed):\n', ' del self._buffer[:e.consumed + seplen]\n', ' else:\n', ' self._buffer.clear()\n', ' self._maybe_resume_transport()\n', ' raise ValueError(e.args[0])\n', ' return line\n', '\n', " async def readuntil(self, separator=b'\\n'):\n", ' """Read data from the stream until ``separator`` is found.\n', '\n', ' On success, the data and separator will be removed from the\n', ' internal buffer (consumed). Returned data will include the\n', ' separator at the end.\n', '\n', ' Configured stream limit is used to check result. Limit sets the\n', ' maximal length of data that can be returned, not counting the\n', ' separator.\n', '\n', ' If an EOF occurs and the complete separator is still not found,\n', ' an IncompleteReadError exception will be raised, and the internal\n', ' buffer will be reset. The IncompleteReadError.partial attribute\n', ' may contain the separator partially.\n', '\n', ' If the data cannot be read because of over limit, a\n', ' LimitOverrunError exception will be raised, and the data\n', ' will be left in the internal buffer, so it can be read again.\n', ' """\n', ' seplen = len(separator)\n', ' if seplen == 0:\n', " raise ValueError('Separator should be at least one-byte string')\n", '\n', ' if self._exception is not None:\n', ' raise self._exception\n', '\n', ' # Consume whole buffer except last bytes, which length is\n', " # one less than seplen. Let's check corner cases with\n", " # separator='SEPARATOR':\n", ' # * we have received almost complete separator (without last\n', " # byte). i.e buffer='some textSEPARATO'. In this case we\n", ' # can safely consume len(separator) - 1 bytes.\n', ' # * last byte of buffer is first byte of separator, i.e.\n', " # buffer='abcdefghijklmnopqrS'. We may safely consume\n", ' # everything except that last byte, but this require to\n', ' # analyze bytes of buffer that match partial separator.\n', ' # This is slow and/or require FSM. For this case our\n', ' # implementation is not optimal, since require rescanning\n', ' # of data that is known to not belong to separator. In\n', ' # real world, separator will not be so long to notice\n', ' # performance problems. Even when reading MIME-encoded\n', ' # messages :)\n', '\n', ' # `offset` is the number of bytes from the beginning of the buffer\n', ' # where there is no occurrence of `separator`.\n', ' offset = 0\n', '\n', ' # Loop until we find `separator` in the buffer, exceed the buffer size,\n', ' # or an EOF has happened.\n', ' while True:\n', ' buflen = len(self._buffer)\n', '\n', ' # Check if we now have enough data in the buffer for `separator` to\n', ' # fit.\n', ' if buflen - offset >= seplen:\n', ' isep = self._buffer.find(separator, offset)\n', '\n', ' if isep != -1:\n', ' # `separator` is in the buffer. `isep` will be used later\n', ' # to retrieve the data.\n', ' break\n', '\n', ' # see upper comment for explanation.\n', ' offset = buflen + 1 - seplen\n', ' if offset > self._limit:\n', ' raise exceptions.LimitOverrunError(\n', " 'Separator is not found, and chunk exceed the limit',\n", ' offset)\n', '\n', ' # Complete message (with full separator) may be present in buffer\n', ' # even when EOF flag is set. This may happen when the last chunk\n', " # adds data which makes separator be found. That's why we check for\n", ' # EOF *ater* inspecting the buffer.\n', ' if self._eof:\n', ' chunk = bytes(self._buffer)\n', ' self._buffer.clear()\n', ' raise exceptions.IncompleteReadError(chunk, None)\n', '\n', ' # _wait_for_data() will resume reading if stream was paused.\n', " await self._wait_for_data('readuntil')\n", '\n', ' if isep > self._limit:\n', ' raise exceptions.LimitOverrunError(\n', " 'Separator is found, but chunk is longer than limit', isep)\n", '\n', ' chunk = self._buffer[:isep + seplen]\n', ' del self._buffer[:isep + seplen]\n', ' self._maybe_resume_transport()\n', ' return bytes(chunk)\n', '\n', ' async def read(self, n=-1):\n', ' """Read up to `n` bytes from the stream.\n', '\n', ' If n is not provided, or set to -1, read until EOF and return all read\n', ' bytes. If the EOF was received and the internal buffer is empty, return\n', ' an empty bytes object.\n', '\n', ' If n is zero, return empty bytes object immediately.\n', '\n', ' If n is positive, this function try to read `n` bytes, and may return\n', ' less or equal bytes than requested, but at least one byte. If EOF was\n', ' received before any byte is read, this function returns empty byte\n', ' object.\n', '\n', ' Returned value is not limited with limit, configured at stream\n', ' creation.\n', '\n', ' If stream was paused, this function will automatically resume it if\n', ' needed.\n', ' """\n', '\n', ' if self._exception is not None:\n', ' raise self._exception\n', '\n', ' if n == 0:\n', " return b''\n", '\n', ' if n < 0:\n', ' # This used to just loop creating a new waiter hoping to\n', ' # collect everything in self._buffer, but that would\n', ' # deadlock if the subprocess sends more than self.limit\n', ' # bytes. So just call self.read(self._limit) until EOF.\n', ' blocks = []\n', ' while True:\n', ' block = await self.read(self._limit)\n', ' if not block:\n', ' break\n', ' blocks.append(block)\n', " return b''.join(blocks)\n", '\n', ' if not self._buffer and not self._eof:\n', " await self._wait_for_data('read')\n", '\n', ' # This will work right even if buffer is less than n bytes\n', ' data = bytes(self._buffer[:n])\n', ' del self._buffer[:n]\n', '\n', ' self._maybe_resume_transport()\n', ' return data\n', '\n', ' async def readexactly(self, n):\n', ' """Read exactly `n` bytes.\n', '\n', ' Raise an IncompleteReadError if EOF is reached before `n` bytes can be\n', ' read. The IncompleteReadError.partial attribute of the exception will\n', ' contain the partial read bytes.\n', '\n', ' if n is zero, return empty bytes object.\n', '\n', ' Returned value is not limited with limit, configured at stream\n', ' creation.\n', '\n', ' If stream was paused, this function will automatically resume it if\n', ' needed.\n', ' """\n', ' if n < 0:\n', " raise ValueError('readexactly size can not be less than zero')\n", '\n', ' if self._exception is not None:\n', ' raise self._exception\n', '\n', ' if n == 0:\n', " return b''\n", '\n', ' while len(self._buffer) < n:\n', ' if self._eof:\n', ' incomplete = bytes(self._buffer)\n', ' self._buffer.clear()\n', ' raise exceptions.IncompleteReadError(incomplete, n)\n', '\n', " await self._wait_for_data('readexactly')\n", '\n', ' if len(self._buffer) == n:\n', ' data = bytes(self._buffer)\n', ' self._buffer.clear()\n', ' else:\n', ' data = bytes(self._buffer[:n])\n', ' del self._buffer[:n]\n', ' self._maybe_resume_transport()\n', ' return data\n', '\n', ' def __aiter__(self):\n', ' return self\n', '\n', ' async def __anext__(self):\n', ' val = await self.readline()\n', " if val == b'':\n", ' raise StopAsyncIteration\n', ' return val\n'], '/nix/store/65h1mb8604dbw077762j702q6b8i0mpw-python3-3.9.13/lib/python3.9/asyncio/streams.py'), '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/handler.py': (2526, 1.0, ['from .resource import Resource\n', 'from .response import Status, Response\n', 'from .request import Connection, Context\n', 'from .util import get_path_components\n', 'from urllib.parse import urlparse\n', 'from typing import Dict, Callable, Awaitable\n', '\n', 'import logging\n', 'import re\n', '\n', 'Handler = Callable[[str, Connection], Awaitable[Response]]\n', 'PORT_RE = re.compile(r":([0-9]{1,5})$")\n', '\n', '\n', 'class GenericHandler:\n', ' def __init__(self, url_map: Dict[str, Dict[str, Resource]]):\n', ' self.url_map = url_map\n', ' self.log = logging.getLogger("amethyst.handler.GenericHandler")\n', '\n', ' async def __call__(self, url: str, conn: Connection) -> Response:\n', ' result = urlparse(url)\n', '\n', ' if not result.scheme:\n', ' return Response(Status.BAD_REQUEST, f"Requested URL must have a scheme.")\n', '\n', ' if result.scheme != "gemini":\n', ' # This is exclusively a Gemini server.\n', ' return Response(\n', ' Status.PROXY_REQUEST_REFUSED,\n', ' f"This server does not proxy non-Gemini URLs.",\n', ' )\n', '\n', ' host = result.netloc\n', '\n', ' if port_match := PORT_RE.search(host):\n', ' if int(port_match.group(1)) != conn.server.config.port:\n', ' return Response(\n', ' Status.PROXY_REQUEST_REFUSED, f"{host} is not served here."\n', ' )\n', '\n', ' host = PORT_RE.sub("", host)\n', '\n', ' if host not in self.url_map:\n', ' self.log.warn(f"Received request for host {host} not in URL map")\n', '\n', ' return Response(\n', ' Status.PROXY_REQUEST_REFUSED,\n', ' f"{host} is not served here.",\n', ' )\n', '\n', ' req_path = result.path\n', ' try:\n', ' req_path = get_path_components(req_path)\n', ' except ValueError:\n', ' return Response(Status.BAD_REQUEST, "Invalid URL")\n', '\n', ' paths = [(get_path_components(i), v) for i, v in self.url_map[host].items()]\n', '\n', ' for path, resource in sorted(paths, key=lambda k: len(k[0]), reverse=True):\n', ' if len(req_path) < len(path) or req_path[: len(path)] != path:\n', ' continue\n', '\n', ' truncated_path = "/".join(req_path[len(path) :])\n', ' if result.path.endswith("/"):\n', ' truncated_path += "/"\n', '\n', ' return await resource(\n', ' Context(\n', ' result.netloc,\n', ' result.path,\n', ' truncated_path,\n', ' result.query,\n', ' conn,\n', ' )\n', ' )\n', '\n', ' return Response(Status.NOT_FOUND, f"{req_path} was not found on this server.")\n'], '/nix/store/yycyafj9l8c4f6mkqng6xy8f23i7rhwm-amethyst-0.0.1/lib/python3.9/site-packages/amethyst/handler.py'), '/nix/store/f8p87m3sfv6sz8scmyxnmsfkj3brh3fs-python3.9-amethyst_extensions-0.0.1/lib/python3.9/site-packages/amethyst_ext/pydoc.py': (6841, 1.0, ['from amethyst.response import Response, Status\n', '\n', 'import importlib\n', 'import inspect\n', 'import pkgutil\n', 'import re\n', 'import sys\n', 'import textwrap\n', '\n', 'SITE_PACKAGES_RE = re.compile(r"lib/python[^/]+/site-packages")\n', 'PYTHON3_RE = re.compile(r"python3[^-]*")\n', '\n', '\n', 'class PydocResource():\n', ' @staticmethod\n', ' def classify(thing):\n', ' if inspect.ismodule(thing):\n', ' return "module"\n', ' elif inspect.isclass(thing):\n', ' return "class"\n', ' elif (inspect.isfunction(thing) or inspect.ismethod(thing) or\n', ' inspect.ismethoddescriptor(thing) or inspect.isroutine(thing)):\n', ' return "function"\n', ' else:\n', ' return "other"\n', '\n', ' def doc_class(self, cls, name=None):\n', ' lines = []\n', '\n', ' if name is None:\n', ' name = cls.__name__\n', ' else:\n', ' name = f"{name}.{cls.__name__}"\n', '\n', ' lines.append(f"### {name}")\n', ' if (clsdoc := getattr(cls, "__doc__")):\n', ' lines.append(f"```\\n{clsdoc}\\n```\\n")\n', '\n', ' members = {}\n', ' members = {"class": [], "function": [], "other": []}\n', '\n', ' for name, member in inspect.getmembers(cls):\n', ' if name.startswith("_"):\n', ' continue\n', '\n', ' if (classification := self.classify(member)) in {"class", "function", "other"}:\n', ' members[classification].append((name, member))\n', '\n', ' members["class"].sort()\n', ' for _, scls in members["class"]:\n', ' lines.append(self.doc_class(scls, name))\n', '\n', ' members["function"].sort()\n', ' for name, func in members["function"]:\n', ' lines.append(self.doc_func(func))\n', '\n', ' members["other"].sort()\n', ' for name, other in members["other"]:\n', ' lines.append(self.doc_other(name, other))\n', '\n', ' return "\\n".join(lines)\n', '\n', ' def doc_func(self, func):\n', ' lines = []\n', '\n', ' lines.append("```")\n', ' try:\n', ' lines.append(f"{func.__name__}{inspect.signature(func)}")\n', ' except ValueError:\n', ' lines.append(f"{func.__name__}(...)")\n', '\n', ' if (funcdoc := getattr(func, "__doc__")):\n', ' lines.append(f"\\n{textwrap.indent(funcdoc, \' \')}\\n```\\n")\n', ' else:\n', ' lines.append("```\\n")\n', '\n', ' return "\\n".join(lines)\n', '\n', ' def doc_other(self, name, other):\n', ' doc = getattr(other, "__doc__", "")\n', ' if doc and doc != type(other).__doc__:\n', ' doc = textwrap.indent(doc, " ")\n', ' doc += "\\n```\\n"\n', ' else:\n', ' doc = "```"\n', '\n', ' return f"```\\n{name} = {other!r}\\n{doc}"\n', '\n', ' def doc_mod(self, modname):\n', ' lines = []\n', '\n', ' try:\n', ' module = importlib.import_module(modname)\n', ' except ImportError:\n', ' return None\n', '\n', ' ispkg = (getattr(module, "__package__", "") == modname)\n', '\n', ' lines.append("=> _ Back to module index")\n', ' lines.append("=> _/search Go to module by name")\n', ' if "." in modname:\n', ' components = modname.split(".")\n', ' for i in range(len(components) - 1, 0, -1):\n', ' lines.append("=> " + ".".join(components[:i]))\n', '\n', ' if ispkg:\n', ' lines.append(f"# {modname} (package)")\n', ' else:\n', ' lines.append(f"# {modname}")\n', '\n', ' if (moddoc := getattr(module, "__doc__")):\n', ' lines.append(f"```\\n{moddoc}\\n```")\n', ' else:\n', ' lines.append("This module has no docstring.")\n', '\n', ' members = {"module": [], "class": [], "function": [], "other": []}\n', ' for name, member in inspect.getmembers(module):\n', ' if name.startswith("_"):\n', ' continue\n', '\n', ' members[self.classify(member)].append((name, member))\n', '\n', ' if members["class"]:\n', ' members["class"].sort()\n', ' lines.append("## Classes")\n', ' for name, cls in members["class"]:\n', ' lines.append(self.doc_class(cls))\n', '\n', ' if members["function"]:\n', ' members["function"].sort()\n', ' lines.append("## Functions")\n', ' for name, func in members["function"]:\n', ' lines.append(f"### {name}")\n', ' lines.append(self.doc_func(func))\n', '\n', ' if members["other"]:\n', ' lines.append("## Other members")\n', ' members["other"].sort()\n', ' for name, other in members["other"]:\n', ' lines.append(self.doc_other(name, other))\n', '\n', ' if members["module"]:\n', ' members["module"].sort()\n', ' lines.append("## Modules")\n', ' for name, mod in members["module"]:\n', ' lines.append(f"=> {mod.__name__} {name}")\n', '\n', ' return "\\n".join(lines)\n', '\n', ' def index(self):\n', ' lines = []\n', '\n', ' lines.append("=> _/search Go to module by name")\n', '\n', ' lines.append("# Built-in modules")\n', ' names = [name for name in sys.builtin_module_names if name != "__main__"]\n', ' for name in sorted(names):\n', ' lines.append(f"=> {name}")\n', '\n', ' lines.append("# Python modules")\n', ' for dirname in sorted(sys.path):\n', ' display = dirname\n', ' if display.startswith("/nix/store/"):\n', ' display = f"(nix)/{display[44:]}"\n', '\n', ' display = SITE_PACKAGES_RE.sub("l/p/s-p", display)\n', ' display = PYTHON3_RE.sub("p3", display)\n', '\n', ' modpkgs = []\n', ' for importer, name, ispkg in pkgutil.iter_modules([dirname]):\n', ' if any((0xD800 <= ord(ch) <= 0xDFFF) for ch in name):\n', ' # Ignore modules that contain surrogate characters\n', ' # (pydoc does this)\n', ' continue\n', '\n', ' if name == "setup":\n', ' # never import "setup.py"\n', ' continue\n', '\n', ' modpkgs.append((name, ispkg))\n', '\n', ' if modpkgs:\n', ' lines.append(f"## {display}")\n', ' for name, ispkg in sorted(modpkgs):\n', ' if ispkg:\n', ' lines.append(f"=> {name} {name} (package)")\n', ' else:\n', ' lines.append(f"=> {name}")\n', '\n', ' return "\\n".join(lines)\n', '\n', '\n', ' async def __call__(self, ctx):\n', ' path = ctx.path\n', ' if not path:\n', ' return Response(Status.REDIRECT_PERMANENT, ctx.orig_path + "/")\n', '\n', ' path = path.strip("/")\n', ' if not path or path == "_":\n', ' text = self.index()\n', '\n', ' elif path == "_/search":\n', ' if ctx.query:\n', ' try:\n', ' importlib.import_module(ctx.query)\n', ' return Response(Status.REDIRECT_TEMPORARY, "../" + ctx.query)\n', ' except ImportError:\n', ' return Response(Status.INPUT, f"Sorry, I don\'t know about {ctx.query}. Module name?")\n', '\n', ' return Response(Status.INPUT, "Module name?")\n', ' else:\n', ' text = self.doc_mod(path)\n', '\n', ' if text is not None:\n', ' return Response(\n', ' Status.SUCCESS, "text/gemini", text.encode()\n', ' )\n', '\n', ' return Response(Status.NOT_FOUND, "text/gemini")\n'], '/nix/store/f8p87m3sfv6sz8scmyxnmsfkj3brh3fs-python3.9-amethyst_extensions-0.0.1/lib/python3.9/site-packages/amethyst_ext/pydoc.py'), '/nix/store/65h1mb8604dbw077762j702q6b8i0mpw-python3-3.9.13/lib/python3.9/pydoc.py': (109600, 1.0, ['#!/usr/bin/env python3\n', '"""Generate Python documentation in HTML or text for interactive use.\n', '\n', 'At the Python interactive prompt, calling help(thing) on a Python object\n', 'documents the object, and calling help() starts up an interactive\n', 'help session.\n', '\n', 'Or, at the shell command line outside of Python:\n', '\n', 'Run "pydoc <name>" to show documentation on something. <name> may be\n', 'the name of a function, module, package, or a dotted reference to a\n', 'class or function within a module or module in a package. If the\n', 'argument contains a path segment delimiter (e.g. slash on Unix,\n', 'backslash on Windows) it is treated as the path to a Python source file.\n', '\n', 'Run "pydoc -k <keyword>" to search for a keyword in the synopsis lines\n', 'of all available modules.\n', '\n', 'Run "pydoc -n <hostname>" to start an HTTP server with the given\n', 'hostname (default: localhost) on the local machine.\n', '\n', 'Run "pydoc -p <port>" to start an HTTP server on the given port on the\n', 'local machine. Port number 0 can be used to get an arbitrary unused port.\n', '\n', 'Run "pydoc -b" to start an HTTP server on an arbitrary unused port and\n', 'open a Web browser to interactively browse documentation. Combine with\n', 'the -n and -p options to control the hostname and port used.\n', '\n', 'Run "pydoc -w <name>" to write out the HTML documentation for a module\n', 'to a file named "<name>.html".\n', '\n', 'Module docs for core modules are assumed to be in\n', '\n', ' https://docs.python.org/X.Y/library/\n', '\n', 'This can be overridden by setting the PYTHONDOCS environment variable\n', 'to a different URL or to a local directory containing the Library\n', 'Reference Manual pages.\n', '"""\n', "__all__ = ['help']\n", '__author__ = "Ka-Ping Yee <ping@lfw.org>"\n', '__date__ = "26 February 2001"\n', '\n', '__credits__ = """Guido van Rossum, for an excellent programming language.\n', 'Tommy Burnette, the original creator of manpy.\n', 'Paul Prescod, for all his work on onlinehelp.\n', 'Richard Chamberlain, for the first implementation of textdoc.\n', '"""\n', '\n', "# Known bugs that can't be fixed here:\n", '# - synopsis() cannot be prevented from clobbering existing\n', '# loaded modules.\n', '# - If the __file__ attribute on a module is a relative path and\n', '# the current directory is changed with os.chdir(), an incorrect\n', '# path will be displayed.\n', '\n', 'import builtins\n', 'import importlib._bootstrap\n', 'import importlib._bootstrap_external\n', 'import importlib.machinery\n', 'import importlib.util\n', 'import inspect\n', 'import io\n', 'import os\n', 'import pkgutil\n', 'import platform\n', 'import re\n', 'import sys\n', 'import sysconfig\n', 'import time\n', 'import tokenize\n', 'import types\n', 'import urllib.parse\n', 'import warnings\n', 'from collections import deque\n', 'from reprlib import Repr\n', 'from traceback import format_exception_only\n', '\n', '\n', '# --------------------------------------------------------- common routines\n', '\n', 'def pathdirs():\n', ' """Convert sys.path into a list of absolute, existing, unique paths."""\n', ' dirs = []\n', ' normdirs = []\n', ' for dir in sys.path:\n', " dir = os.path.abspath(dir or '.')\n", ' normdir = os.path.normcase(dir)\n', ' if normdir not in normdirs and os.path.isdir(dir):\n', ' dirs.append(dir)\n', ' normdirs.append(normdir)\n', ' return dirs\n', '\n', 'def _isclass(object):\n', ' return inspect.isclass(object) and not isinstance(object, types.GenericAlias)\n', '\n', 'def _findclass(func):\n', ' cls = sys.modules.get(func.__module__)\n', ' if cls is None:\n', ' return None\n', " for name in func.__qualname__.split('.')[:-1]:\n", ' cls = getattr(cls, name)\n', ' if not _isclass(cls):\n', ' return None\n', ' return cls\n', '\n', 'def _finddoc(obj):\n', ' if inspect.ismethod(obj):\n', ' name = obj.__func__.__name__\n', ' self = obj.__self__\n', ' if (_isclass(self) and\n', " getattr(getattr(self, name, None), '__func__') is obj.__func__):\n", ' # classmethod\n', ' cls = self\n', ' else:\n', ' cls = self.__class__\n', ' elif inspect.isfunction(obj):\n', ' name = obj.__name__\n', ' cls = _findclass(obj)\n', ' if cls is None or getattr(cls, name) is not obj:\n', ' return None\n', ' elif inspect.isbuiltin(obj):\n', ' name = obj.__name__\n', ' self = obj.__self__\n', ' if (_isclass(self) and\n', " self.__qualname__ + '.' + name == obj.__qualname__):\n", ' # classmethod\n', ' cls = self\n', ' else:\n', ' cls = self.__class__\n', ' # Should be tested before isdatadescriptor().\n', ' elif isinstance(obj, property):\n', ' func = obj.fget\n', ' name = func.__name__\n', ' cls = _findclass(func)\n', ' if cls is None or getattr(cls, name) is not obj:\n', ' return None\n', ' elif inspect.ismethoddescriptor(obj) or inspect.isdatadescriptor(obj):\n', ' name = obj.__name__\n', ' cls = obj.__objclass__\n', ' if getattr(cls, name) is not obj:\n', ' return None\n', ' if inspect.ismemberdescriptor(obj):\n', " slots = getattr(cls, '__slots__', None)\n", ' if isinstance(slots, dict) and name in slots:\n', ' return slots[name]\n', ' else:\n', ' return None\n', ' for base in cls.__mro__:\n', ' try:\n', ' doc = _getowndoc(getattr(base, name))\n', ' except AttributeError:\n', ' continue\n', ' if doc is not None:\n', ' return doc\n', ' return None\n', '\n', 'def _getowndoc(obj):\n', ' """Get the documentation string for an object if it is not\n', ' inherited from its class."""\n', ' try:\n', " doc = object.__getattribute__(obj, '__doc__')\n", ' if doc is None:\n', ' return None\n', ' if obj is not type:\n', ' typedoc = type(obj).__doc__\n', ' if isinstance(typedoc, str) and typedoc == doc:\n', ' return None\n', ' return doc\n', ' except AttributeError:\n', ' return None\n', '\n', 'def _getdoc(object):\n', ' """Get the documentation string for an object.\n', '\n', ' All tabs are expanded to spaces. To clean up docstrings that are\n', ' indented to line up with blocks of code, any whitespace than can be\n', ' uniformly removed from the second line onwards is removed."""\n', ' doc = _getowndoc(object)\n', ' if doc is None:\n', ' try:\n', ' doc = _finddoc(object)\n', ' except (AttributeError, TypeError):\n', ' return None\n', ' if not isinstance(doc, str):\n', ' return None\n', ' return inspect.cleandoc(doc)\n', '\n', 'def getdoc(object):\n', ' """Get the doc string or comments for an object."""\n', ' result = _getdoc(object) or inspect.getcomments(object)\n', " return result and re.sub('^ *\\n', '', result.rstrip()) or ''\n", '\n', 'def splitdoc(doc):\n', ' """Split a doc string into a synopsis line (if any) and the rest."""\n', " lines = doc.strip().split('\\n')\n", ' if len(lines) == 1:\n', " return lines[0], ''\n", ' elif len(lines) >= 2 and not lines[1].rstrip():\n', " return lines[0], '\\n'.join(lines[2:])\n", " return '', '\\n'.join(lines)\n", '\n', 'def classname(object, modname):\n', ' """Get a class name and qualify it with a module name if necessary."""\n', ' name = object.__name__\n', ' if object.__module__ != modname:\n', " name = object.__module__ + '.' + name\n", ' return name\n', '\n', 'def isdata(object):\n', ' """Check if an object is of a type that probably means it\'s data."""\n', ' return not (inspect.ismodule(object) or _isclass(object) or\n', ' inspect.isroutine(object) or inspect.isframe(object) or\n', ' inspect.istraceback(object) or inspect.iscode(object))\n', '\n', 'def replace(text, *pairs):\n', ' """Do a series of global replacements on a string."""\n', ' while pairs:\n', ' text = pairs[1].join(text.split(pairs[0]))\n', ' pairs = pairs[2:]\n', ' return text\n', '\n', 'def cram(text, maxlen):\n', ' """Omit part of a string if needed to make it fit in a maximum length."""\n', ' if len(text) > maxlen:\n', ' pre = max(0, (maxlen-3)//2)\n', ' post = max(0, maxlen-3-pre)\n', " return text[:pre] + '...' + text[len(text)-post:]\n", ' return text\n', '\n', "_re_stripid = re.compile(r' at 0x[0-9a-f]{6,16}(>+)