import traceback import json import requests import traceback import json import re import feedparser import requests import logging from datetime import datetime from time import mktime from typing import Annotated from fastapi import Depends, APIRouter from openai import OpenAI from settings.defaults import Settings, get_settings from ai.prompts import GENERATE_TAGS_PROMPT, TRANSLATE_ML_PROMPT router = APIRouter() logger = logging.getLogger(__name__) @router.get("/yle_en", summary="Yle.fi News RSS") async def update(settings: Annotated[Settings, Depends(get_settings)]): feed_url = settings.feeds['yle_en']['url'] mastodon_server = settings.mastodon_server mastodon_aid = settings.feeds['yle_en']['account_id'] mastodon_token = str(settings.feeds['yle_en']['token']) mastodon_get_statuses_url=f'{mastodon_server}/api/v1/accounts/{mastodon_aid}/statuses' mastodon_post_statuses_url=f'{mastodon_server}/api/v1/statuses' try: last_status_timestamp=datetime.fromisoformat(load_last_status(mastodon_get_statuses_url, mastodon_token)['created_at']) new_entries=load_feed_rss(feed_url, last_status_timestamp) logger.info(f'Found {len(new_entries)} new entries since {last_status_timestamp}') if (len(new_entries) == 0): return { "status": 200, "body": { "posted_entries": 0, "successful": True } } posted_entries=list(map(lambda x: post_rss_entry_to_mastodon(mastodon_post_statuses_url, mastodon_token, x, settings), new_entries)) return { "status": 200, "body": { "posted_entries": len(posted_entries), "successful": True } } except Exception as e: msg = ''.join(traceback.format_exception_only(e)) logger.error(msg) return { "status": 501, "body": { "posted_entries": 0, "message": msg, "successful": False } } def load_last_status(url, token): response=requests.get(url + '?limit=1', headers={ 'Authorization' : f'Bearer {token}' }) if response.status_code != 200: raise Exception('Failed to contact Mastodon', response.text) return json.loads(response.text)[0] def post_rss_entry_to_mastodon(url:str, token:str, entry, settings:Settings): title = entry.title description = entry.summary linkEnd = entry.link.find('?') if linkEnd > -1: link = entry.link[0:linkEnd] else: link = entry.link if 'tags' in entry: categories = [t.get('term') for t in entry.tags] categories = sanitize_tags(categories) categories = [str for str in categories if len(str) >= 3] if len(categories) > 3: tags = translate_tags(categories, settings) message = f"{title}\n\n{description}\n\n{link}\n\n{tags}" else: tags = generate_tags(description, 5, settings) message = f"{title}\n\n{description}\n\n{link}\n\n{tags}" else: tags = generate_tags(description, 5, settings) message = f"{title}\n\n{description}\n\n{link}\n\n{tags}" headers = { 'Authorization': f'Bearer {token}', 'Content-type': 'application/x-www-form-urlencoded', 'User-Agent': 'Serverless Feed' } params = { 'status': message, 'language': 'en', 'visibility': 'public' } response = requests.post(url, data=params, headers=headers) if response.status_code != 200: print('Failed to post message', response) return response def load_feed_rss(url, since): feed=feedparser.parse(url) return [entry for entry in feed.entries if datetime.fromtimestamp(mktime(entry.published_parsed)) > since.replace(tzinfo=datetime.fromtimestamp(mktime(entry.published_parsed)).tzinfo)] def generate_tags(text:str, num_tags:int, settings:Settings): try: client = OpenAI(api_key=settings.openai_api_key) result = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": GENERATE_TAGS_PROMPT % (num_tags, text) }] ) return result.choices[0].message.content except Exception as e: logger.error('Failed to generate tags', e) return '' def translate_tags(tags:list, settings:Settings): try: client = OpenAI(api_key=settings.openai_api_key) result = client.chat.completions.create( model="gpt-3.5-turbo", messages=[{"role": "user", "content": TRANSLATE_ML_PROMPT % (" ".join(tags)) }] ) return result.choices[0].message.content except Exception as e: logger.error('Failed to translate tags', e) return '' def sanitize_tags(categories): categories = [part for item in categories for part in item.split('=')] categories = map(lambda str: re.sub(r'\s+','', str), categories) categories = map(lambda str: re.sub(r'[0-9.\–\-()<>{}#]+','', str), categories) categories = map(lambda str: re.sub('&','And', str), categories) categories = map(lambda str: str.capitalize(), categories) categories = map(lambda str: f'#{str}', categories) return categories