Spaces:
Running
Running
File size: 3,518 Bytes
5d924ac 0df66dc 5d924ac ac35357 1c28833 0df66dc 8993ce0 41341e4 0df66dc ac35357 a2d105f c000986 1c28833 41341e4 1c28833 ac35357 a2d105f 5d924ac 1c28833 5d924ac a2d105f 5d924ac 0df66dc 5d924ac 1c28833 5d924ac 0df66dc ab09a10 0df66dc ab09a10 0df66dc 5d924ac 41341e4 5d924ac 1c28833 5d924ac 1c28833 c000986 8993ce0 0df66dc 5d924ac 0df66dc 8993ce0 5d924ac | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from contextlib import asynccontextmanager
from apscheduler.schedulers.background import BackgroundScheduler
import threading
import os
import logging
import itertools
from datetime import datetime, timedelta
from .database import get_cached_articles, Article
from .update_news import update_news
DEFAULT_UPDATE_HOURS = '0,6,12,18' # UTC time
FORCED_UPDATE_HOURS = 8 # Force update if articles are older than 8 hours
if os.environ.get('DEBUG', 'false') == 'true':
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG)
# set update interval from environment variable if available
update_schedule_hours = os.environ.get('UPDATE_HOURS', DEFAULT_UPDATE_HOURS)
# for updating the news feed periodically
scheduler = BackgroundScheduler()
# prevent initiating multiple concurrent updates
is_updating = threading.Lock()
def safe_update_news():
'''
Wrapper for update_news to ensure only one instance runs at a time.
'''
# check whether there is no update currently in progress
if not is_updating.locked():
# update news in background
with is_updating:
update_news()
@asynccontextmanager
async def lifespan(app: FastAPI):
# update news periodically
scheduler.add_job(
safe_update_news,
'cron',
hour=update_schedule_hours,
minute=0,
id='update_task',
replace_existing=True
)
scheduler.start()
yield
# stop the scheduler when closing the server
scheduler.shutdown()
app = FastAPI(lifespan=lifespan)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory='app/templates')
if os.getenv('DEBUG') == 'true':
@app.get('/update')
async def update_articles(request: Request, background_tasks: BackgroundTasks):
background_tasks.add_task(safe_update_news)
return {'response': 'ok'}
def group_articles_by_category(articles: list[Article]) -> dict:
sorted_articles = sorted(articles, key=lambda a: a.category)
grouped_articles = {}
for category, category_articles in itertools.groupby(sorted_articles, lambda a: a.category):
grouped_articles[category] = list(category_articles)
return grouped_articles
@app.get('/')
async def read_root(request: Request, background_tasks: BackgroundTasks):
# retrieve articles from database
articles = get_cached_articles()
# how many hours since the last update
last_updated_hours = -1
# no articles yet
if not articles:
# update news in background
background_tasks.add_task(safe_update_news)
else:
last_updated_hours = int((datetime.now() - articles[0].date).total_seconds() // 3600)
# Force update if articles are older than treshold (due to missed/failed update)
if last_updated_hours >= FORCED_UPDATE_HOURS:
background_tasks.add_task(safe_update_news)
categorized_articles = group_articles_by_category(articles)
return templates.TemplateResponse(
'index.html',
{
'request': request,
'categorized_articles': categorized_articles,
'update_time': -1 if is_updating.locked() else last_updated_hours
}
)
|