import traceback import json import requests import traceback import json import re import feedparser import requests import logging from datetime import datetime from time import mktime from typing import Annotated from fastapi import Depends, APIRouter from settings.defaults import Settings, get_settings router = APIRouter() logger = logging.getLogger(__name__) @router.get("/the_local", summary="The Local RSS") async def update(settings: Annotated[Settings, Depends(get_settings)]): feed_url = settings.feeds['the_local']['url'] mastodon_server = settings.mastodon_server mastodon_aid = settings.feeds['the_local']['account_id'] mastodon_token = str(settings.feeds['the_local']['token']) mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses' mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses' try: last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at']) new_entries=load_feed_rss(feed_url, last_status_timestamp) logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}') if (len(new_entries) == 0): return { "status": 200, "body": { "posted_entries": 0, "successful": True } } posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x), new_entries)) return { "status": 200, "body": { "posted_entries": len(posted_entries), "successful": True } } except Exception: return { "status": 501, "body": { "posted_entries": 0, "message": ''.join(traceback.format_exception_only()), "successful": False } } def load_last_status(url, token): response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' }) if response.status_code != 200: raise Exception('Failed to contact Mastodon', response.text) return json.loads(response.text)[0] def post_rss_entry_to_mastodon(url, token, entry): title = entry.title description = entry.summary link = entry.link if 'tags' in entry: categories = [t.get('term') for t in entry.tags] categories = map(lambda str: re.sub(r'\s+','', str), categories) categories = map(lambda str: re.sub(r'[0-9.\–\-()]+','', str), categories) categories = map(lambda str: str.capitalize(), categories) categories = [str for str in categories if len(str) >= 3] if len(categories) > 0: categories = map(lambda str: str.capitalize(), categories) categories = map(lambda str: str if str.startswith('#') else f'#{str}', categories) categories = ' '.join(categories) message = f"{title}\n\n{description}\n\n{link}\n\n{categories}" else: message = f"{title}\n\n{description}\n\n{link}" else: message = f"{title}\n\n{description}\n\n{link}" headers = { 'Authorization': f'Bearer {token}', 'Content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'Serverless Feed' } params = { 'status': message, 'language': 'en', 'visibility': 'public' } logger.info('posting to', url) response = requests.post(url, data=params, headers=headers) if response.status_code != 200: logger.error('Failed to post message', response) return response def load_feed_rss(url, since): feed=feedparser.parse(url) return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)]