💾 Archived View for gemini.ctrl-c.club › ~anedroid › gists › worker.py captured on 2024-06-16 at 13:15:07.

View Raw

More Information

-=-=-=-=-=-=-

from functools import partial, wraps
from queue import Queue
from threading import Thread, Event


class Transaction:
    def __init__(self, job):
        self.job = job
        self.finished = Event()
        self.result = None


def activate(target):
    @wraps(target)
    def inner(self, *args, **kwargs):
        transaction = Transaction(partial(target, self, *args, **kwargs))
        self.jobs.put(transaction)
        transaction.finished.wait()
        return transaction.result

    return inner


class Worker:
    def __init__(self):
        self.jobs = Queue()
        Thread(target=self.loop).start()

    def loop(self):
        while transaction := self.jobs.get():
            print('Running job', transaction.job)
            transaction.result = transaction.job()
            transaction.finished.set()

    @activate
    def example(self, value):
        return value


worker = Worker()
print(worker.example(3))
worker.jobs.put(None)