import time import os import datetime import requests from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi_utilities import repeat_at my_data = list( str('-') * 10 ) START_STAT_URL = os.getenv('START_STAT_URL', '') @asynccontextmanager async def lifespan(app: FastAPI): # --- startup --- work() # send stat info if START_STAT_URL: requests.get(f'{START_STAT_URL}?add=HF_der-opt_worker_start_event') yield # --- shutdown --- app = FastAPI(lifespan=lifespan) @repeat_at(cron="*/15 * * * *") def work(): now_dtime = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d %H:%M:%S') my_data.append(now_dtime) my_data.pop(0) # trim old values print('-= work! =-') @app.get("/") def read_root(): return {"my_data": my_data}