import traceback import json import requests import timeago import boto3 import logging from datetime import datetime from pathlib import Path from yattag import Doc, indent from typing import Annotated from fastapi import Depends, APIRouter from settings.defaults import Settings, get_settings router = APIRouter() logger = logging.getLogger(__name__) @router.get("/generate", summary="Embeddable Mastodon Feed") async def generate_static_page(settings: Annotated[Settings, Depends(get_settings)]): mastodon_token = settings.feeds['embed']['token'] s3_bucket = settings.feeds['embed']['s3_bucket'] s3_filename = settings.feeds['embed']['s3_key'] mastodon_get_statuses_url=settings.feeds['embed']['url'] try: latest_statuses = load_latest_statuses(mastodon_get_statuses_url, mastodon_token,20) latest_statuses = [status for status in latest_statuses if status['in_reply_to_id'] == None] latest_statuses = [status for status in latest_statuses if status['in_reply_to_account_id'] == None] latest_statuses = [status for status in latest_statuses if status['reblog'] == None] latest_statuses = [status for status in latest_statuses if status['visibility'] == 'public'] latest_statuses = [status for status in latest_statuses if status['language'] == 'en'] html = convertToHTML(latest_statuses) uploadToAmazonS3(s3_bucket, s3_filename, html) return { "status": 200, "body": { "successful": True } } except Exception as e: msg = ''.join(traceback.format_exception_only(e)) logger.error(msg) return { "status": 501, "body": { "message": msg, "successful": False } } def load_latest_statuses(url, token, limit): response=requests.get(f'{url}?limit={limit}', headers={ 'Authorization' : f'Bearer {token}' }) if response.status_code != 200: raise Exception('Failed to contact Mastodon', response.text) return json.loads(response.text) def convertToHTML(statuses): css_file = Path(__file__).parent / '../resources/embed.css' with css_file.open('r') as css: doc, tag, text = Doc().tagtext() with tag('html'): with tag('head'): doc.stag('meta', charset='UTF-8') doc.stag('base', charset='_top') with tag('style'): doc.asis(css.read()) with tag('body'): with tag('div', klass='container'): for status in statuses: with tag('div', klass='item'): with tag('div', klass='author'): with tag('a', target='_top', klass='avatar', href=status['account']['url']): doc.stag('img', klass='avatar', src=status['account']['avatar']) with tag('div', klass='author-info'): with tag('a', target='_top', klass='author-displayname', href=status['account']['url']): text(status['account']['display_name']) with tag('div', klass='author-fullname'): text(status['account']['username'], '@', 'ahlroos.me') with tag('div', klass='item-content'): doc.asis(status['content']) for attachment in status['media_attachments']: with tag('a', target='_top', klass='enclosure', href=attachment['url']): doc.stag('img', src=attachment['preview_url'] or '', alt=attachment['description'] or '', title=attachment['description'] or '') with tag('a', target='_top', klass='date', href=status['uri']): tst = datetime.fromisoformat(status['created_at']).replace(tzinfo=None) text(timeago.format(tst, datetime.now())) return indent(doc.getvalue()) def uploadToAmazonS3(bucket, key, content): s3 = boto3.resource('s3') object = s3.Object(bucket, key) # type: ignore object.put(Body=content, ACL='public-read', ContentType='text/html')