import traceback import json import requests import traceback import json import feedparser import requests import logging from datetime import datetime from time import mktime from typing import Annotated from fastapi import Depends, APIRouter from settings.defaults import Settings, get_settings router = APIRouter() logger = logging.getLogger(__name__) @router.get("/hn", summary="Hacker News RSS") async def update(settings: Annotated[Settings, Depends(get_settings)]): feed_url = settings.feeds['hn']['url'] mastodon_server = settings.mastodon_server mastodon_aid = settings.feeds['hn']['account_id'] mastodon_token = str(settings.feeds['hn']['token']) mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses' mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses' try: last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at']) new_entries=load_feed_rss(feed_url, last_status_timestamp) logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}') if (len(new_entries) == 0): return { "status": 200, "body": { "posted_entries": 0, "successful": True } } posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x), new_entries)) return { "status": 200, "body": { "posted_entries": len(posted_entries), "successful": True } } except Exception: return { "status": 501, "body": { "posted_entries": 0, "message": ''.join(traceback.format_exception_only()), "successful": False } } def load_last_status(url, token): response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' }) if response.status_code != 200: raise Exception('Failed to contact Mastodon', response.text) return json.loads(response.text)[0] def post_rss_entry_to_mastodon(url, token, entry): title = entry.title link = entry.link message = f"{title}\n\n{link}" headers = { 'Authorization': f'Bearer {token}', 'Content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'Serverless Feed' } params = { 'status': message, 'language': 'en', 'visibility': 'public' } logger.info('posting to', url) response = requests.post(url, data=params, headers=headers) if response.status_code != 200: logger.error('Failed to post message', response) return response def load_feed_rss(url, since): feed=feedparser.parse(url) return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)]