import traceback import json import requests import traceback import json import re import feedparser import requests import logging from datetime import datetime from time import mktime from typing import Annotated from fastapi import Depends, APIRouter from settings.defaults import Settings, get_settings router = APIRouter() logger = logging.getLogger(__name__) @router.get("/yle_fi", summary="Yle.fi Uutiset RSS") async def update(settings: Annotated[Settings, Depends(get_settings)]): feed_url = settings.feeds['yle_fi']['url'] mastodon_server = settings.mastodon_server mastodon_aid = settings.feeds['yle_fi']['account_id'] mastodon_token = settings.feeds['yle_fi']['token'] mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses' mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses' try: last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at']) new_entries=load_feed_rss(feed_url, last_status_timestamp) logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}') if (len(new_entries) == 0): return { "status": 200, "body": { "posted_entries": 0, "successful": True } } posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x), new_entries)) return { "status": 200, "body": { "posted_entries": len(posted_entries), "successful": True } } except Exception as e: msg = ''.join(traceback.format_exception_only(e)) logger.error(msg) return { "status": 501, "body": { "posted_entries": 0, "message": msg, "successful": False } } def split(arr, char): return [tag for subtags in (map(lambda str: str.split(char), arr)) for tag in subtags] def capitalize(arr, char): result = map(lambda str: str.split(char), arr) result = map(lambda subtag: map(lambda str: str.capitalize(), subtag), result) result = map(lambda subtag: ''.join(subtag), result) return result def load_last_status(url, token): response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' }) if response.status_code != 200: raise Exception('Failed to contact Mastodon', response.text) return json.loads(response.text)[0] def post_rss_entry_to_mastodon(url, token, entry): title = entry.title description = entry.summary link = entry.link linkEnd = entry.link.find('?') if linkEnd > -1: link = entry.link[0:linkEnd] else: link = entry.link if 'tags' in entry: categories = [t.get('term') for t in entry.tags] categories = split(categories, ',') categories = capitalize(categories, ' ') categories = capitalize(categories, '–') categories = capitalize(categories, '-') categories = capitalize(categories, '/') categories = capitalize(categories, '\\') categories = map(lambda str: re.sub(r'\s+','', str), categories) categories = map(lambda str: re.sub(r'[0-9.()]+','', str), categories) categories = map(lambda str: re.sub('&','Ja', str), categories) categories = [str for str in categories if len(str) >= 3] if len(categories) > 0: categories = map(lambda str: str if str.startswith('#') else f'#{str}', categories) categories = ' '.join(categories) message = f"{title}\n\n{description}\n\n{link}\n\n{categories}" else: message = f"{title}\n\n{description}\n\n{link}" else: message = f"{title}\n\n{description}\n\n{link}" headers = { 'Authorization': f'Bearer {token}', 'Content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'Serverless Feed' } params = { 'status': message, 'language': 'fi', 'visibility': 'public' } response = requests.post(url, data=params, headers=headers) if response.status_code != 200: logger.error('Failed to post message', response) return response def load_feed_rss(url, since): feed=feedparser.parse(url) return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)]