💾 Archived View for woodpeckersnest.space › ~schapps › projects › aox2html.gmi captured on 2024-12-17 at 09:47:08. Gemini links have been rewritten to link to archived content

View Raw

More Information

-=-=-=-=-=-=-

AOX2HTML - Atom Over XMPP To HTML

Read XMPP PubSub posts, of type Atom Over XMPP, with HTML browser.

About

This is a Python script which is intended for software that can parse HTML (Agregore, Dillo, Falkon, and Otter Browser).

This script receives XMPP Query URI from stdin (standard input) and returns content to stdout (standard output).

This script utilizes Slixmpp to connect to the XMPP Telecommunication Network, and lxml to construct an XHTML document.

Usage

Test this script via command line:

printf 'xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup' | python ./xmpp.filter.dpi > berlin-xmpp-meetup.xhtml

Code

#!/usr/bin/env python3
# coding: utf-8
#
# Usage:
#
# printf 'xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup' | python ./xmpp.filter.dpi > berlin-xmpp-meetup.xhtml
#
# Example URIs:
#
# xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup;item=schimon-presents-blasta-a-pubsub-bookmarks-system-and-rivista-a-pubsub-dynamic-iGbveu
# xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup;item=7363a41d-1146-40b3-ac0f-8ee2559591a3#let-s-meet-up-with-flow
# xmpp:edhelas@movim.eu?;node=urn:xmpp:microblog:0;item=working-on-launching-the-movim-network-qPBzwc
# xmpp:goffi@goffi.org?;node=urn:xmpp:microblog:0;item=libervia-v0-8-la-cecilia-BdQ4
# xmpp:blog.jmp.chat?;node=urn:xmpp:microblog:0;item=september-newsletter-2024#newsletter-esim-adapter-launch

import asyncio
import datetime
from lxml import etree
import select
from slixmpp import ClientXMPP
from slixmpp.exceptions import IqError, IqTimeout
from slixmpp.stanza.iq import Iq
import sys
from urllib.parse import urlparse
import xml.etree.ElementTree as ET

jabber_id = "your@jabber.id/Dillo"
pass_word = "your_jabber_password"

class XmppClient(ClientXMPP):
    def __init__(self, jabber_id, pass_word):
        super().__init__(jabber_id, pass_word)
        self.register_plugin('xep_0060')
        self.connect()


class XmppXep0060:

    async def get_item(self: ClientXMPP, jid: str, node: str, item_id: str):
        result = {'error' : None}
        try:
            result['iq'] = await self.plugin['xep_0060'].get_item(jid, node, item_id, timeout=5)
        except (IqError, IqTimeout) as e:
            iq = e.iq
            result['error'] = e
            result['iq'] = iq
            result['condition'] = iq['error']['condition']
            result['text'] = iq['error']['text']
        return result
        
    
    async def get_items(self: ClientXMPP, jid: str, node: str):
        result = {'error' : None}
        try:
            result['iq'] = await self.plugin['xep_0060'].get_items(jid, node)
        except (IqError, IqTimeout) as e:
            iq = e.iq
            result['error'] = e
            result['iq'] = iq
            result['condition'] = iq['error']['condition']
            result['text'] = iq['error']['text']
        return result
    
    async def get_nodes(self: ClientXMPP, jid: str):
        result = {'error' : None}
        try:
            result['iq'] = await self.plugin['xep_0060'].get_nodes(jid, timeout=5)
        except (IqError, IqTimeout) as e:
            iq = e.iq
            result['error'] = e
            result['iq'] = iq
            result['condition'] = iq['error']['condition']
            result['text'] = iq['error']['text']
        return result


class Xml:

    def extract_atom(iq: Iq):
        """Extract data from an Atom Syndication Format (RFC 4287) of a Publish-Subscribe (XEP-0060) node item."""
        jid = iq['from'].bare
        node = iq['pubsub']['items']['node']
        atom = {}
        atom['title'] = jid
        atom['subtitle'] = node
        atom['language'] = iq['pubsub']['items']['lang']
        atom['items'] = []
        items = iq['pubsub']['items']
        for item in list(items)[::-1]:
            atom_item = {}
            item_payload = item['payload']
            namespace = '{http://www.w3.org/2005/Atom}'
            title = item_payload.find(namespace + 'title')
            links = item_payload.find(namespace + 'link')
            if (not isinstance(title, ET.Element) and
                not isinstance(links, ET.Element)): continue
            title_text = 'No title' if title == None else title.text
            atom_item['title'] = title_text
            if isinstance(links, ET.Element):
                atom_item['links'] = []
                for link in item_payload.findall(namespace + 'link'):
                    link_href = link.attrib['href'] if 'href' in link.attrib else ''
                    link_type = link.attrib['type'] if 'type' in link.attrib else ''
                    link_rel = link.attrib['rel'] if 'rel' in link.attrib else ''
                    atom_item['links'].append({'href': link_href,
                                               'rel': link_rel,
                                               'type': link_type})
            contents = item_payload.find(namespace + 'content')
            atom_item['contents'] = []
            if isinstance(contents, ET.Element):
                for content in item_payload.findall(namespace + 'content'):
                    if not content.text: continue
                    content_text = content.text
                    content_type = content.attrib['type'] if 'type' in content.attrib else 'html'
                    content_type_text = 'html' if 'html' in content_type else 'text'
                    atom_item['contents'].append(content_text)
            else:
                summary = item_payload.find(namespace + 'summary')
                summary_text = summary.text if summary else None
                if summary_text:
                    summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html'
                    summary_type_text = 'html' if 'html' in summary_type else 'text'
                    atom_item['contents'].append(summary_text)
               # else:
               #     atom_item['contents'].append('No content.')
            published = item_payload.find(namespace + 'published')
            published_text = '' if published == None else published.text
            atom_item['published'] = published_text
            updated = item_payload.find(namespace + 'updated')
            updated_text = '' if updated == None else updated.text
            atom_item['updated'] = updated_text
            atom_item['authors'] = []
            authors = item_payload.find(namespace + 'author')
            if isinstance(authors, ET.Element):
                for author in item_payload.findall(namespace + 'author'):
                    atom_item_author = {}
                    author_email = author.find(namespace + 'email')
                    if author_email is not None:
                        author_email_text = author_email.text
                        if author_email_text:
                            atom_item_author['email'] = author_email_text
                    else:
                        author_email_text = None
                    author_uri = author.find(namespace + 'uri')
                    if author_uri is not None:
                        author_uri_text = author_uri.text
                        if author_uri_text:
                            atom_item_author['uri'] = author_uri_text
                    else:
                        author_uri_text = None
                    author_name = author.find(namespace + 'name')
                    if author_name is not None and author_name.text:
                        author_name_text = author_name.text
                    else:
                        author_name_text = author_uri_text or author_email_text
                    atom_item_author['name'] = author_name_text
                    atom_item['authors'].append(atom_item_author)
            categories = item_payload.find(namespace + 'category')
            atom_item['categories'] = []
            if isinstance(categories, ET.Element):
                for category in item_payload.findall(namespace + 'category'):
                    if 'term' in category.attrib and category.attrib['term']:
                        category_term = category.attrib['term']
                        atom_item['categories'].append(category_term)
            identifier = item_payload.find(namespace + 'id')
            if identifier is not None and identifier.attrib: print(identifier.attrib)
            identifier_text = item['id'] if identifier == None else identifier.text
            atom_item['id'] = identifier_text
            #atom_item['id'] = item['id']
            atom['items'].append(atom_item)
        return atom

    def generate_xhtml(atom: dict):
        """Generate an XHTML document."""
        e_html = ET.Element('html')
        e_html.set('xmlns', 'http://www.w3.org/1999/xhtml')
        e_head = ET.SubElement(e_html, 'head')
        ET.SubElement(e_head, 'title').text = atom['title']
        ET.SubElement(e_head, 'link', {'rel': 'stylesheet',
                                       'href': 'pubsub.css'})
        e_body = ET.SubElement(e_html, "body")
        ET.SubElement(e_body, "h1").text = atom['title']
        ET.SubElement(e_body, "h2").text = atom['subtitle']
        for item in atom['items']:
            item_id = item['id']
            title = item['title']
            links = item['links']
            e_article = ET.SubElement(e_body, 'article')
            e_title = ET.SubElement(e_article, 'h3')
            e_title.text = item['title']
            e_title.set('id', item['id'])
            e_date = ET.SubElement(e_article, 'h4')
            e_date.text = item['published']
            e_date.set('title', 'Updated: ' + item['updated'])
            authors = item['authors']
            if authors:
                e_authors = ET.SubElement(e_article, "dl")
                ET.SubElement(e_authors, "dt").text = 'Authors'
                for author in authors:
                    e_dd = ET.SubElement(e_authors, 'dd')
                    e_author = ET.SubElement(e_dd, 'a')
                    e_author.text = author['name'] or author['uri'] or author['email']
                    if 'email' in author and author['email']:
                        e_author.set('href', 'mailto:' + author['email'])
                    elif 'uri' in author and author['uri']:
                        e_author.set('href', author['uri'])
            ET.SubElement(e_article, 'p').text = ' '.join(item['contents'])
            if links:
                e_links = ET.SubElement(e_article, "dl")
                e_links.set('class', 'links')
                ET.SubElement(e_links, "dt").text = 'Links'
                for link in links:
                    e_dd = ET.SubElement(e_links, 'dd')
                    e_link = ET.SubElement(e_dd, 'a')
                    e_link.set('href', link['href'])
                    e_link.text = link['rel']
                    if link['type']: ET.SubElement(e_dd, 'span').text = link['type']
            categories = item['categories']
            if categories:
                e_categories = ET.SubElement(e_article, "dl")
                e_categories.set('class', 'categories')
                ET.SubElement(e_categories, "dt").text = 'Categories'
                for category in categories:
                    ET.SubElement(e_categories, 'dd').text = category
        return ET.tostring(e_html, encoding='unicode')


class Url:

    def fragment_uri(uri: str):
        """Extract data and parameters from a given URI."""
        xmpp_uri = {'error' : '',
                    'jid' : '',
                    'node' : '',
                    'item' : ''}
        fragmented_uri = urlparse(uri)
        if fragmented_uri.scheme == 'xmpp':
            xmpp_uri['error'] = None
            xmpp_uri['jid'] = fragmented_uri.path
            for parameter in fragmented_uri.query.split(';'):
                if parameter.startswith('node='):
                    xmpp_uri['node'] = parameter[5:]
                if parameter.startswith('item='):
                    xmpp_uri['item'] = parameter[5:]
        else:
            xmpp_uri['error'] = 'Please enter a valid XMPP Query URI.'
        return xmpp_uri


class DilloXmpp:

    async def engage(xmpp: ClientXMPP, jid: str, node=None, item_id=None):
        """Begin processing XMPP PubSub."""
        if jid and node and item_id:
            res = await XmppXep0060.get_item(xmpp, jid, node, item_id)
            if not res['error']:
                atom = Xml.extract_atom(res['iq'])
                result = Xml.generate_xhtml(atom)
            else:
                text = 'Please ensure that PubSub node "{}" and item "{}" are valid and accessible.'.format(node, item_id)
                result = res['condition'] + ' : ' + res['text']
        elif jid and node:
            res = await XmppXep0060.get_items(xmpp, jid, node)
            if not res['error']:
                atom = Xml.extract_atom(res['iq'])
                result = Xml.generate_xhtml(atom)
            else:
                text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node)
                result = res['condition'] + ' : ' + res['text']
        elif jid:
            res = await XmppXep0060.get_nodes(xmpp, jid)
            if not res['error']:
                atom = Xml.extract_atom(res['iq'])
                result = Xml.generate_xhtml(atom)
            else:
                text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node)
                result = res['condition'] + ' : ' + res['text']
        elif node:
            result = 'PubSub parameter (Jabber ID) appears to be missing.'
        if not result:
            result = 'Please check PubSub JID and Node ID.'
        return result


#if __name__ == '__main__':
uri = sys.stdin.read()
xmpp_uri = Url.fragment_uri(uri)
if not xmpp_uri['error']:
    xmpp = XmppClient(jabber_id, pass_word)
    jid = xmpp_uri['jid']
    node = xmpp_uri['node']
    item = xmpp_uri['item']
    loop = asyncio.get_event_loop()
    result = loop.run_until_complete(DilloXmpp.engage(xmpp, jid, node, item))
    # FIXME
    #loop.close()
    xmpp.disconnect(
        ignore_send_queue=False,
        reason='Dillo: Receiving data of type Atom Over XMPP has been completed.',
        wait=2.0)
else:
    result = 'Please check the validity of the XMPP Query URI: {}'.format(uri)

sys.stdout.write(result)
sys.stdout.flush()

sys.exit()

References

Dillo

A request to confirm the usability with the Dillo Browser.

XMPP PubSub Plugin - Dillo-dev - lists.mailman3.com