ml / core /app.py
devin15's picture
Upload 31 files
3979178 verified
import asyncio
from fastapi import FastAPI, Request
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from core.config import get_settings
from core.logger import setup_logger
from core.refresh_token import TokenManager
from core.router import router
from core.scheduler import SchedulerManager
settings = get_settings()
logger = setup_logger(__name__)
scheduler_manager = SchedulerManager()
# print(settings.SECRET)
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
version="0.0.1",
description=settings.DESCRIPTION,
)
# 配置中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# # 添加可信主机中间件
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # 在生产环境中应该限制允许的主机
)
# API端点用于管理定时任务
@app.get("/scheduler/jobs")
async def list_jobs():
jobs = scheduler_manager.get_jobs()
return [{"id": job.id, "next_run_time": job.next_run_time} for job in jobs]
@app.post("/scheduler/jobs/{job_id}/trigger")
async def trigger_job(job_id: str):
job = scheduler_manager.scheduler.get_job(job_id)
if not job:
return {"error": "Job not found"}
await job.func() # 假设任务是异步的
return {"message": f"Job {job_id} triggered"}
@app.delete("/scheduler/jobs/{job_id}")
async def delete_job(job_id: str):
try:
scheduler_manager.remove_job(job_id)
return {"message": f"Job {job_id} removed"}
except Exception as e:
return {"error": str(e)}
# 添加路由
app.include_router(router, prefix="/api/v1")
app.include_router(router, prefix="/v1") # 兼容性路由
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"An error occurred: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"message": "An internal server error occurred.",
"detail": str(exc)
},
)
# # 创建 TokenManager 实例
token_manager = TokenManager()
@app.on_event("startup")
async def startup_event():
# 在应用启动时创建任务
app.state.refresh_task = asyncio.create_task(token_manager.start_auto_refresh())
await scheduler_manager.start()
@app.on_event("shutdown")
async def shutdown_event():
# 在应用关闭时取消任务
if hasattr(app.state, 'refresh_task'):
app.state.refresh_task.cancel()
try:
await app.state.refresh_task
except asyncio.CancelledError:
pass
await scheduler_manager.shutdown()
return app
app = create_app()