💾 Archived View for woodpeckersnest.space › ~schapps › projects › aox2html.gmi captured on 2024-12-17 at 09:47:08. Gemini links have been rewritten to link to archived content
-=-=-=-=-=-=-
Read XMPP PubSub posts, of type Atom Over XMPP, with HTML browser.
This is a Python script which is intended for software that can parse HTML (Agregore, Dillo, Falkon, and Otter Browser).
This script receives XMPP Query URI from stdin (standard input) and returns content to stdout (standard output).
This script utilizes Slixmpp to connect to the XMPP Telecommunication Network, and lxml to construct an XHTML document.
Test this script via command line:
printf 'xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup' | python ./xmpp.filter.dpi > berlin-xmpp-meetup.xhtml
#!/usr/bin/env python3 # coding: utf-8 # # Usage: # # printf 'xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup' | python ./xmpp.filter.dpi > berlin-xmpp-meetup.xhtml # # Example URIs: # # xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup;item=schimon-presents-blasta-a-pubsub-bookmarks-system-and-rivista-a-pubsub-dynamic-iGbveu # xmpp:pubsub.movim.eu?;node=berlin-xmpp-meetup;item=7363a41d-1146-40b3-ac0f-8ee2559591a3#let-s-meet-up-with-flow # xmpp:edhelas@movim.eu?;node=urn:xmpp:microblog:0;item=working-on-launching-the-movim-network-qPBzwc # xmpp:goffi@goffi.org?;node=urn:xmpp:microblog:0;item=libervia-v0-8-la-cecilia-BdQ4 # xmpp:blog.jmp.chat?;node=urn:xmpp:microblog:0;item=september-newsletter-2024#newsletter-esim-adapter-launch import asyncio import datetime from lxml import etree import select from slixmpp import ClientXMPP from slixmpp.exceptions import IqError, IqTimeout from slixmpp.stanza.iq import Iq import sys from urllib.parse import urlparse import xml.etree.ElementTree as ET jabber_id = "your@jabber.id/Dillo" pass_word = "your_jabber_password" class XmppClient(ClientXMPP): def __init__(self, jabber_id, pass_word): super().__init__(jabber_id, pass_word) self.register_plugin('xep_0060') self.connect() class XmppXep0060: async def get_item(self: ClientXMPP, jid: str, node: str, item_id: str): result = {'error' : None} try: result['iq'] = await self.plugin['xep_0060'].get_item(jid, node, item_id, timeout=5) except (IqError, IqTimeout) as e: iq = e.iq result['error'] = e result['iq'] = iq result['condition'] = iq['error']['condition'] result['text'] = iq['error']['text'] return result async def get_items(self: ClientXMPP, jid: str, node: str): result = {'error' : None} try: result['iq'] = await self.plugin['xep_0060'].get_items(jid, node) except (IqError, IqTimeout) as e: iq = e.iq result['error'] = e result['iq'] = iq result['condition'] = iq['error']['condition'] result['text'] = iq['error']['text'] return result async def get_nodes(self: ClientXMPP, jid: str): result = {'error' : None} try: result['iq'] = await self.plugin['xep_0060'].get_nodes(jid, timeout=5) except (IqError, IqTimeout) as e: iq = e.iq result['error'] = e result['iq'] = iq result['condition'] = iq['error']['condition'] result['text'] = iq['error']['text'] return result class Xml: def extract_atom(iq: Iq): """Extract data from an Atom Syndication Format (RFC 4287) of a Publish-Subscribe (XEP-0060) node item.""" jid = iq['from'].bare node = iq['pubsub']['items']['node'] atom = {} atom['title'] = jid atom['subtitle'] = node atom['language'] = iq['pubsub']['items']['lang'] atom['items'] = [] items = iq['pubsub']['items'] for item in list(items)[::-1]: atom_item = {} item_payload = item['payload'] namespace = '{http://www.w3.org/2005/Atom}' title = item_payload.find(namespace + 'title') links = item_payload.find(namespace + 'link') if (not isinstance(title, ET.Element) and not isinstance(links, ET.Element)): continue title_text = 'No title' if title == None else title.text atom_item['title'] = title_text if isinstance(links, ET.Element): atom_item['links'] = [] for link in item_payload.findall(namespace + 'link'): link_href = link.attrib['href'] if 'href' in link.attrib else '' link_type = link.attrib['type'] if 'type' in link.attrib else '' link_rel = link.attrib['rel'] if 'rel' in link.attrib else '' atom_item['links'].append({'href': link_href, 'rel': link_rel, 'type': link_type}) contents = item_payload.find(namespace + 'content') atom_item['contents'] = [] if isinstance(contents, ET.Element): for content in item_payload.findall(namespace + 'content'): if not content.text: continue content_text = content.text content_type = content.attrib['type'] if 'type' in content.attrib else 'html' content_type_text = 'html' if 'html' in content_type else 'text' atom_item['contents'].append(content_text) else: summary = item_payload.find(namespace + 'summary') summary_text = summary.text if summary else None if summary_text: summary_type = summary.attrib['type'] if 'type' in summary.attrib else 'html' summary_type_text = 'html' if 'html' in summary_type else 'text' atom_item['contents'].append(summary_text) # else: # atom_item['contents'].append('No content.') published = item_payload.find(namespace + 'published') published_text = '' if published == None else published.text atom_item['published'] = published_text updated = item_payload.find(namespace + 'updated') updated_text = '' if updated == None else updated.text atom_item['updated'] = updated_text atom_item['authors'] = [] authors = item_payload.find(namespace + 'author') if isinstance(authors, ET.Element): for author in item_payload.findall(namespace + 'author'): atom_item_author = {} author_email = author.find(namespace + 'email') if author_email is not None: author_email_text = author_email.text if author_email_text: atom_item_author['email'] = author_email_text else: author_email_text = None author_uri = author.find(namespace + 'uri') if author_uri is not None: author_uri_text = author_uri.text if author_uri_text: atom_item_author['uri'] = author_uri_text else: author_uri_text = None author_name = author.find(namespace + 'name') if author_name is not None and author_name.text: author_name_text = author_name.text else: author_name_text = author_uri_text or author_email_text atom_item_author['name'] = author_name_text atom_item['authors'].append(atom_item_author) categories = item_payload.find(namespace + 'category') atom_item['categories'] = [] if isinstance(categories, ET.Element): for category in item_payload.findall(namespace + 'category'): if 'term' in category.attrib and category.attrib['term']: category_term = category.attrib['term'] atom_item['categories'].append(category_term) identifier = item_payload.find(namespace + 'id') if identifier is not None and identifier.attrib: print(identifier.attrib) identifier_text = item['id'] if identifier == None else identifier.text atom_item['id'] = identifier_text #atom_item['id'] = item['id'] atom['items'].append(atom_item) return atom def generate_xhtml(atom: dict): """Generate an XHTML document.""" e_html = ET.Element('html') e_html.set('xmlns', 'http://www.w3.org/1999/xhtml') e_head = ET.SubElement(e_html, 'head') ET.SubElement(e_head, 'title').text = atom['title'] ET.SubElement(e_head, 'link', {'rel': 'stylesheet', 'href': 'pubsub.css'}) e_body = ET.SubElement(e_html, "body") ET.SubElement(e_body, "h1").text = atom['title'] ET.SubElement(e_body, "h2").text = atom['subtitle'] for item in atom['items']: item_id = item['id'] title = item['title'] links = item['links'] e_article = ET.SubElement(e_body, 'article') e_title = ET.SubElement(e_article, 'h3') e_title.text = item['title'] e_title.set('id', item['id']) e_date = ET.SubElement(e_article, 'h4') e_date.text = item['published'] e_date.set('title', 'Updated: ' + item['updated']) authors = item['authors'] if authors: e_authors = ET.SubElement(e_article, "dl") ET.SubElement(e_authors, "dt").text = 'Authors' for author in authors: e_dd = ET.SubElement(e_authors, 'dd') e_author = ET.SubElement(e_dd, 'a') e_author.text = author['name'] or author['uri'] or author['email'] if 'email' in author and author['email']: e_author.set('href', 'mailto:' + author['email']) elif 'uri' in author and author['uri']: e_author.set('href', author['uri']) ET.SubElement(e_article, 'p').text = ' '.join(item['contents']) if links: e_links = ET.SubElement(e_article, "dl") e_links.set('class', 'links') ET.SubElement(e_links, "dt").text = 'Links' for link in links: e_dd = ET.SubElement(e_links, 'dd') e_link = ET.SubElement(e_dd, 'a') e_link.set('href', link['href']) e_link.text = link['rel'] if link['type']: ET.SubElement(e_dd, 'span').text = link['type'] categories = item['categories'] if categories: e_categories = ET.SubElement(e_article, "dl") e_categories.set('class', 'categories') ET.SubElement(e_categories, "dt").text = 'Categories' for category in categories: ET.SubElement(e_categories, 'dd').text = category return ET.tostring(e_html, encoding='unicode') class Url: def fragment_uri(uri: str): """Extract data and parameters from a given URI.""" xmpp_uri = {'error' : '', 'jid' : '', 'node' : '', 'item' : ''} fragmented_uri = urlparse(uri) if fragmented_uri.scheme == 'xmpp': xmpp_uri['error'] = None xmpp_uri['jid'] = fragmented_uri.path for parameter in fragmented_uri.query.split(';'): if parameter.startswith('node='): xmpp_uri['node'] = parameter[5:] if parameter.startswith('item='): xmpp_uri['item'] = parameter[5:] else: xmpp_uri['error'] = 'Please enter a valid XMPP Query URI.' return xmpp_uri class DilloXmpp: async def engage(xmpp: ClientXMPP, jid: str, node=None, item_id=None): """Begin processing XMPP PubSub.""" if jid and node and item_id: res = await XmppXep0060.get_item(xmpp, jid, node, item_id) if not res['error']: atom = Xml.extract_atom(res['iq']) result = Xml.generate_xhtml(atom) else: text = 'Please ensure that PubSub node "{}" and item "{}" are valid and accessible.'.format(node, item_id) result = res['condition'] + ' : ' + res['text'] elif jid and node: res = await XmppXep0060.get_items(xmpp, jid, node) if not res['error']: atom = Xml.extract_atom(res['iq']) result = Xml.generate_xhtml(atom) else: text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) result = res['condition'] + ' : ' + res['text'] elif jid: res = await XmppXep0060.get_nodes(xmpp, jid) if not res['error']: atom = Xml.extract_atom(res['iq']) result = Xml.generate_xhtml(atom) else: text = 'Please ensure that PubSub node "{}" is valid and accessible.'.format(node) result = res['condition'] + ' : ' + res['text'] elif node: result = 'PubSub parameter (Jabber ID) appears to be missing.' if not result: result = 'Please check PubSub JID and Node ID.' return result #if __name__ == '__main__': uri = sys.stdin.read() xmpp_uri = Url.fragment_uri(uri) if not xmpp_uri['error']: xmpp = XmppClient(jabber_id, pass_word) jid = xmpp_uri['jid'] node = xmpp_uri['node'] item = xmpp_uri['item'] loop = asyncio.get_event_loop() result = loop.run_until_complete(DilloXmpp.engage(xmpp, jid, node, item)) # FIXME #loop.close() xmpp.disconnect( ignore_send_queue=False, reason='Dillo: Receiving data of type Atom Over XMPP has been completed.', wait=2.0) else: result = 'Please check the validity of the XMPP Query URI: {}'.format(uri) sys.stdout.write(result) sys.stdout.flush() sys.exit()
A request to confirm the usability with the Dillo Browser.