File size: 3,101 Bytes
3979178 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 |
import asyncio
from fastapi import FastAPI, Request
from starlette.middleware.cors import CORSMiddleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from fastapi.responses import JSONResponse
from core.config import get_settings
from core.logger import setup_logger
from core.refresh_token import TokenManager
from core.router import router
from core.scheduler import SchedulerManager
settings = get_settings()
logger = setup_logger(__name__)
scheduler_manager = SchedulerManager()
# print(settings.SECRET)
def create_app() -> FastAPI:
app = FastAPI(
title=settings.PROJECT_NAME,
version="0.0.1",
description=settings.DESCRIPTION,
)
# 配置中间件
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# # 添加可信主机中间件
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["*"] # 在生产环境中应该限制允许的主机
)
# API端点用于管理定时任务
@app.get("/scheduler/jobs")
async def list_jobs():
jobs = scheduler_manager.get_jobs()
return [{"id": job.id, "next_run_time": job.next_run_time} for job in jobs]
@app.post("/scheduler/jobs/{job_id}/trigger")
async def trigger_job(job_id: str):
job = scheduler_manager.scheduler.get_job(job_id)
if not job:
return {"error": "Job not found"}
await job.func() # 假设任务是异步的
return {"message": f"Job {job_id} triggered"}
@app.delete("/scheduler/jobs/{job_id}")
async def delete_job(job_id: str):
try:
scheduler_manager.remove_job(job_id)
return {"message": f"Job {job_id} removed"}
except Exception as e:
return {"error": str(e)}
# 添加路由
app.include_router(router, prefix="/api/v1")
app.include_router(router, prefix="/v1") # 兼容性路由
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
logger.error(f"An error occurred: {str(exc)}", exc_info=True)
return JSONResponse(
status_code=500,
content={
"message": "An internal server error occurred.",
"detail": str(exc)
},
)
# # 创建 TokenManager 实例
token_manager = TokenManager()
@app.on_event("startup")
async def startup_event():
# 在应用启动时创建任务
app.state.refresh_task = asyncio.create_task(token_manager.start_auto_refresh())
await scheduler_manager.start()
@app.on_event("shutdown")
async def shutdown_event():
# 在应用关闭时取消任务
if hasattr(app.state, 'refresh_task'):
app.state.refresh_task.cancel()
try:
await app.state.refresh_task
except asyncio.CancelledError:
pass
await scheduler_manager.shutdown()
return app
app = create_app() |