Files
mastobot/app/routers/yle_rss_en.py
John Ahlroos ba97013c27
Some checks failed
Build & Release / build-docker-image (push) Failing after 4m15s
Build & Release / deploy-to-production (push) Has been skipped
Initial version of app
2026-02-17 13:46:10 +01:00

157 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import traceback
import json
import requests
import traceback
import json
import re
import feedparser
import requests
import logging
from datetime import datetime
from time import mktime
from typing import Annotated
from fastapi import Depends, APIRouter
from openai import OpenAI
from settings.defaults import Settings, get_settings
from ai.prompts import GENERATE_TAGS_PROMPT, TRANSLATE_ML_PROMPT
router = APIRouter()
logger = logging.getLogger(__name__)
@router.get("/yle_en", summary="Yle.fi News RSS")
async def update(settings: Annotated[Settings, Depends(get_settings)]):
feed_url = settings.feeds['yle_en']['url']
mastodon_server = settings.mastodon_server
mastodon_aid = settings.feeds['yle_en']['account_id']
mastodon_token = str(settings.feeds['yle_en']['token'])
mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses'
mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses'
try:
last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at'])
new_entries=load_feed_rss(feed_url, last_status_timestamp)
logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}')
if (len(new_entries) == 0):
return {
"status": 200,
"body": {
"posted_entries": 0,
"successful": True
}
}
posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x, settings), new_entries))
return {
"status": 200,
"body": {
"posted_entries": len(posted_entries),
"successful": True
}
}
except Exception as e:
msg = ''.join(traceback.format_exception_only(e))
logger.error(msg)
return {
"status": 501,
"body": {
"posted_entries": 0,
"message": msg,
"successful": False
}
}
def load_last_status(url, token):
response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' })
if response.status_code != 200:
raise Exception('Failed to contact Mastodon', response.text)
return json.loads(response.text)[0]
def post_rss_entry_to_mastodon(url:str, token:str, entry, settings:Settings):
title = entry.title
description = entry.summary
linkEnd = entry.link.find('?')
if linkEnd > -1:
link = entry.link[0:linkEnd]
else:
link = entry.link
if 'tags' in entry:
categories = [t.get('term') for t in entry.tags]
categories = sanitize_tags(categories)
categories = [str for str in categories if len(str) >= 3]
if len(categories) > 3:
tags = translate_tags(categories, settings)
message = f"{title}\n\n{description}\n\n{link}\n\n{tags}"
else:
tags = generate_tags(description, 5, settings)
message = f"{title}\n\n{description}\n\n{link}\n\n{tags}"
else:
tags = generate_tags(description, 5, settings)
message = f"{title}\n\n{description}\n\n{link}\n\n{tags}"
headers = {
'Authorization': f'Bearer {token}',
'Content-type': 'application/x-www-form-urlencoded',
'User-Agent': 'Serverless Feed'
}
params = {
'status': message,
'language': 'en',
'visibility': 'public'
}
response = requests.post(url, data=params, headers=headers)
if response.status_code != 200:
print('Failed to post message', response)
return response
def load_feed_rss(url, since):
feed=feedparser.parse(url)
return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)]
def generate_tags(text:str, num_tags:int, settings:Settings):
try:
client = OpenAI(api_key=settings.openai_api_key)
result = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": GENERATE_TAGS_PROMPT % (num_tags, text) }]
)
return result.choices[0].message.content
except Exception as e:
logger.error('Failed to generate tags', e)
return ''
def translate_tags(tags:list, settings:Settings):
try:
client = OpenAI(api_key=settings.openai_api_key)
result = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": TRANSLATE_ML_PROMPT % (" ".join(tags)) }]
)
return result.choices[0].message.content
except Exception as e:
logger.error('Failed to translate tags', e)
return ''
def sanitize_tags(categories):
categories = [part for item in categories for part in item.split('=')]
categories = map(lambda str: re.sub(r'\s+','', str), categories)
categories = map(lambda str: re.sub(r'[0-9.\\-()<>{}#]+','', str), categories)
categories = map(lambda str: re.sub('&','And', str), categories)
categories = map(lambda str: str.capitalize(), categories)
categories = map(lambda str: f'#{str}', categories)
return categories