Files
mastobot/app/routers/the_local.py

119 lines
3.9 KiB
Python
Raw Permalink Normal View History

2026-02-13 18:18:14 +01:00
import traceback
import json
import requests
import traceback
import json
import re
import feedparser
import requests
import logging
from datetime import datetime
from time import mktime
from typing import Annotated
from fastapi import Depends, APIRouter
from settings.defaults import Settings, get_settings
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("/the_local", summary="The Local RSS")
async def update(settings: Annotated[Settings, Depends(get_settings)]):
feed_url = settings.feeds['the_local']['url']
mastodon_server = settings.mastodon_server
mastodon_aid = settings.feeds['the_local']['account_id']
mastodon_token = str(settings.feeds['the_local']['token'])
mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses'
mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses'
try:
last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at'])
new_entries=load_feed_rss(feed_url, last_status_timestamp)
logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}')
if (len(new_entries) == 0):
return {
"status": 200,
"body": {
"posted_entries": 0,
"successful": True
}
}
posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x), new_entries))
return {
"status": 200,
"body": {
"posted_entries": len(posted_entries),
"successful": True
}
}
except Exception:
return {
"status": 501,
"body": {
"posted_entries": 0,
"message": ''.join(traceback.format_exception_only()),
"successful": False
}
}
def load_last_status(url, token):
response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' })
if response.status_code != 200:
raise Exception('Failed to contact Mastodon', response.text)
return json.loads(response.text)[0]
def post_rss_entry_to_mastodon(url, token, entry):
title = entry.title
description = entry.summary
link = entry.link
if 'tags' in entry:
categories = [t.get('term') for t in entry.tags]
categories = map(lambda str: re.sub(r'\s+','', str), categories)
categories = map(lambda str: re.sub(r'[0-9.\\-()]+','', str), categories)
categories = map(lambda str: str.capitalize(), categories)
categories = [str for str in categories if len(str) >= 3]
if len(categories) > 0:
categories = map(lambda str: str.capitalize(), categories)
categories = map(lambda str: str if str.startswith('#') else f'#{str}', categories)
categories = ' '.join(categories)
message = f"{title}\n\n{description}\n\n{link}\n\n{categories}"
else:
message = f"{title}\n\n{description}\n\n{link}"
else:
message = f"{title}\n\n{description}\n\n{link}"
headers = {
'Authorization': f'Bearer {token}',
'Content-type': 'application/x-www-form-urlencoded',
'User-Agent': 'Serverless Feed'
}
params = {
'status': message,
'language': 'en',
'visibility': 'public'
}
logger.info('posting to', url)
response = requests.post(url, data=params, headers=headers)
if response.status_code != 200:
logger.error('Failed to post message', response)
return response
def load_feed_rss(url, since):
feed=feedparser.parse(url)
return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)]