45 lines
1.8 KiB
Python
45 lines
1.8 KiB
Python
import logging
|
|
from typing import Annotated
|
|
|
|
import requests
|
|
from fastapi import Depends, APIRouter, HTTPException, Request, Response, status
|
|
|
|
from settings.defaults import Settings, get_settings
|
|
|
|
router = APIRouter()
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@router.get("/{account_id}", summary="Fetch Mastodon Statuses")
|
|
async def get_statuses(account_id: str, request:Request, settings: Annotated[Settings, Depends(get_settings)]):
|
|
|
|
if account_id not in settings.mastodon:
|
|
logger.error('Account %s not found', account_id)
|
|
raise HTTPException(status_code=status.HTTP_204_NO_CONTENT, detail="Account not found.")
|
|
|
|
mastodon_account = settings.mastodon[account_id]
|
|
mastodon_server = settings.mastodon_server
|
|
mastodon_get_statuses_url = f'{mastodon_server}/api/v1/accounts/{account_id}/statuses'
|
|
token = mastodon_account['token']
|
|
headers = {
|
|
'Authorization': f'Bearer {token}',
|
|
'Content-type': 'application/json',
|
|
'User-Agent': 'Serverless Feed'
|
|
}
|
|
query_params = dict(request.query_params)
|
|
response = requests.get(mastodon_get_statuses_url, headers=headers, params=query_params)
|
|
if response.status_code != 200:
|
|
logger.error('Failed to get statuses', response)
|
|
raise HTTPException(status_code=response.status_code, detail=response.text)
|
|
|
|
forwarded_headers = {
|
|
'Content-type': 'application/json',
|
|
'User-Agent': 'Serverless Feed'
|
|
}
|
|
|
|
if 'link' in response.headers:
|
|
forwarded_headers["Access-Control-Expose-Headers"] = 'link'
|
|
forwarded_headers['link'] = response.headers['link'].replace(mastodon_get_statuses_url, str(request.base_url) + 'statuses/' + account_id)
|
|
|
|
return Response(content=response.content, status_code=response.status_code, headers=forwarded_headers)
|
|
|