💾 Archived View for cosmic.voyage › betsy › q2n.txt captured on 2022-06-03 at 23:14:51.
View Raw
More Information
-=-=-=-=-=-=-
#!/usr/bin/env python3
"""
q2n - QEC to NNTP sync
This script syncs QEC logs to NNTP.
- everything configurable via `Config`
- has a throttler so we don't accidentally submit too much at a time
- has a dry-run for submission
- it remembers what has already been submitted so they don't get submitted
again
- read from argv or a config file
- set up a cron job
- put it on tildegit (once my application issue get sorted out)
"""
from dataclasses import dataclass
import datetime as dt
import io
import logging
import os
import pickle
import pwd
import random
import subprocess as sp
import time
import typing as t
_LOGGER = logging.getLogger(__name__)
Path = str
User = str
NntpArticleBody = str
LogEntryHash = str
@dataclass
class Config:
listing_dir: str
listing_filename: str
nntp_group: str
nntp_server: str
max_submission: int
submission_store_dir: Path
@classmethod
def create(cls):
return Config(
listing_dir="/var/gopher/",
listing_filename="listing.gophermap",
nntp_server="localhost",
# TODO: find more appropriate one
nntp_group="cosmic.worldbuilding",
max_submission=5,
submission_store_dir="/var/tmp/q2n",
)
@dataclass
class Ship:
name: str
owner: User
@dataclass
class LogEntry:
ship: Ship
author: User
title: str
file_name: str
class LogIterator(t.Protocol):
def __call__(self) -> t.List[LogEntry]: ...
class SubmitCondition(t.Protocol):
def __call__(self, log_entry: LogEntry) -> bool: ...
class LogSubmitter(t.Protocol):
def __call__(self, log: LogEntry) -> None: ...
@dataclass
class Utils:
config: Config
def ship_owner(self, ship_name: str) -> User:
return self._get_path_user(
f"{self.config.listing_dir}/{ship_name}"
)
def read_log_content(self, log: LogEntry) -> str:
return self._read_log_entry(
f"{self.config.listing_dir}/{log.ship.name}/{log.file_name}"
)
@staticmethod
def _read_log_entry(path: str) -> str:
with open(path, "r", encoding="utf-8") as f:
return f.read()
@staticmethod
def _get_path_user(fp: str) -> User:
st = os.stat(fp)
return pwd.getpwuid(st.st_uid).pw_name
@dataclass
class SubmittedLogsStore:
store_dir: str
def __post_init__(self):
import subprocess as sp
sp.check_call(
f"mkdir -p {self.store_dir}",
shell=True
)
def record_submission(self, log: LogEntry):
with open(f"{self.store_dir}/{self.checksum(log)}", "wb") as f:
pickle.dump(log, f)
def load_submitted_logs(self) -> t.List[LogEntryHash]:
return os.listdir(self.store_dir)
@staticmethod
def checksum(log: LogEntry) -> LogEntryHash:
import hashlib
checked_str = f"{log.ship.name}{log.file_name}"
return hashlib.md5(checked_str.encode("utf-8")).hexdigest()
# Throttles log entries to submit. Just in case there's a bug.
# Usually we'd limit logs to submit to a small number, and maybe also
# send out some alert.
SubmissionThrottle = t.Callable[[t.List[LogEntry]], t.List[LogEntry]]
@dataclass
class ListingFileLogIterator(LogIterator):
listing_dir: str
listing_filename: str
utils: Utils
def __call__(self) -> t.List[LogEntry]:
with open(
f"{self.listing_dir}/{self.listing_filename}",
"r",
encoding="utf-8"
) as f:
entries = f.readlines()
return [self._parse(ent) for ent in entries]
def _parse(self, entry: str) -> LogEntry:
"""Parse a listing file entry into a `LogEntry`
An entry looks like this:
0betsy - About QEC /betsy/qec.txt
I.e.
0<ship> - <title><TAB><file_path>
Note:
* <file_path> is rooted at /var/gohper, i.e., where the listing
file resides.
"""
import re
res = re.match(r"^0(.+?) - (.+)\t(.+)$", entry)
if not res: raise ValueError(f"Cannot parse: {entry}")
# It's more robust to use the file path (/ship/fn.txt) to obtain ship's
# name, rather than res.group(1). This is b/c there're duplicated
# entries in the listing:
# 0Polonia - 24131 /Polonia-II/24131.txt
# 0Polonia-II - 24131 /Polonia-II/24131.txt
title = res.group(2)
log_path = res.group(3)
ship, log_fn = self._parse_log_file_name(log_path)
ship_owner = self.utils.ship_owner(ship)
return LogEntry(
ship=Ship(name=ship, owner=ship_owner),
author=ship_owner,
title=title,
file_name=log_fn,
)
@staticmethod
def _parse_log_file_name(ship_and_file: str) -> t.Tuple[str, str]:
"/<ship>/file.txt -> (<ship>, file.txt)"
return t.cast(
t.Tuple[str, str],
tuple(x for x in ship_and_file.split("/") if x),
)
@dataclass
class SubmitConditionImpl(SubmitCondition):
submission_store: SubmittedLogsStore
def __call__(self, log_entry: LogEntry) -> bool:
return (
self.submission_store.checksum(log_entry)
not in self.submission_store.load_submitted_logs()
)
@dataclass
class NntpLogSubmitter(LogSubmitter):
@dataclass
class NntpLogFormat:
subject: str
body: str
from_: str
submission_store: SubmittedLogsStore
read_log_entry: t.Callable[[LogEntry], NntpArticleBody]
nntp_group: str
nntp_server: str
dry_run: bool = False
def __call__(self, log: LogEntry) -> None:
self.nntp_submit(log)
self.submission_store.record_submission(log)
def add_envelope(self, article: str, log: LogEntry) -> str:
return f"""\
TIMESTAMP: {int(time.time())} SGT
AUTHOR: {log.author}
ORIGINATING SHIP: {log.ship.name}
QEC GATEWAY: QG-{random.randint(0, 31)}
{article}
"""
def nntp_submit(self, log: LogEntry) -> None:
import nntplib as nn
s = nn.NNTP(self.nntp_server, readermode=True)
article_body = self.read_log_entry(log)
article_body = self.add_envelope(article_body, log)
msg = f"""\
Newsgroups: {self.nntp_group}
Subject: [QEC] {log.title}
From: {log.author} "{log.author}@cosmic.voyage"
{article_body}
"""
f = io.BytesIO(msg.encode("utf-8"))
f.seek(0)
_LOGGER.info(f"About to submit log:\n{msg}")
if not self.dry_run:
s.post(f)
@dataclass
class SubmissionThrottler:
max_submission: int
def __call__(self, logs: t.List[LogEntry]) -> t.List[LogEntry]:
return logs[0:self.max_submission]
def main():
logging.basicConfig()
logging.root.setLevel(logging.INFO)
config = Config.create()
_LOGGER.info(f"Running with config: {config}")
utils = Utils(config=config)
iterate_logs = ListingFileLogIterator(
listing_dir=config.listing_dir,
listing_filename=config.listing_filename,
utils=utils,
)
throttler = SubmissionThrottler(config.max_submission)
submission_store = SubmittedLogsStore(store_dir=config.submission_store_dir)
should_submit = SubmitConditionImpl(submission_store=submission_store)
submit_log = NntpLogSubmitter(
submission_store=submission_store,
read_log_entry=utils.read_log_content,
nntp_group=config.nntp_group,
nntp_server=config.nntp_server,
dry_run=True, # TODO remove
)
logs_to_submit = [log for log in iterate_logs() if should_submit(log)]
### # FOR TEST: remove - randomly choose one log
### logs_to_submit = logs_to_submit[random.randint(0, len(logs_to_submit)-2):][0:]
logs_to_submit = throttler(logs_to_submit)
_LOGGER.info(f"Submitting {len(logs_to_submit)} logs...")
for log in logs_to_submit: submit_log(log)
if __name__ == "__main__":
main()