| import os, uuid, json, asyncio, time
|
| from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, HTTPException
|
| from fastapi.responses import StreamingResponse
|
| from fastapi.middleware.cors import CORSMiddleware
|
| from fastapi.staticfiles import StaticFiles
|
| from pathlib import Path
|
| from backend.core.engine import ZenithAnalyzer, VideoDownloader, OUTPUT_DIR
|
|
|
| app = FastAPI(title="Zenith AI API")
|
|
|
|
|
| app.add_middleware(
|
| CORSMiddleware,
|
| allow_origins=["*"],
|
| allow_credentials=True,
|
| allow_methods=["*"],
|
| allow_headers=["*"],
|
| )
|
|
|
| analyzer = ZenithAnalyzer()
|
|
|
| def cleanup_old_sessions(max_age_hours=1):
|
| """Supprime les dossiers de session plus vieux que max_age_hours."""
|
| try:
|
| import shutil
|
| now = time.time()
|
| for path in OUTPUT_DIR.glob("session_*"):
|
| if path.is_dir():
|
|
|
| if (now - path.stat().st_mtime) > (max_age_hours * 3600):
|
| shutil.rmtree(path)
|
| except Exception as e:
|
| print(f"Erreur lors du nettoyage : {e}")
|
|
|
| @app.post("/analyze/url")
|
| async def analyze_url(url: str = Form(...), background_tasks: BackgroundTasks = BackgroundTasks()):
|
| background_tasks.add_task(cleanup_old_sessions)
|
| session_id = str(uuid.uuid4())
|
| session_dir = OUTPUT_DIR / f"session_{session_id}"
|
| session_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| video_path = VideoDownloader.download(url, session_dir)
|
| if not video_path:
|
| raise HTTPException(status_code=400, detail="Échec du téléchargement de la vidéo")
|
|
|
|
|
| video_url = f"/output/session_{session_id}/{video_path.name}"
|
| return {"session_id": session_id, "video_path": str(video_path), "video_url": video_url}
|
|
|
| @app.post("/analyze/upload")
|
| async def analyze_upload(file: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks()):
|
| background_tasks.add_task(cleanup_old_sessions)
|
| session_id = str(uuid.uuid4())
|
| session_dir = OUTPUT_DIR / f"session_{session_id}"
|
| session_dir.mkdir(parents=True, exist_ok=True)
|
|
|
| file_path = session_dir / file.filename
|
| with open(file_path, "wb") as buffer:
|
| buffer.write(await file.read())
|
|
|
|
|
| video_url = f"/output/session_{session_id}/{file.filename}"
|
| return {"session_id": session_id, "video_path": str(file_path), "video_url": video_url}
|
|
|
| @app.post("/analyze/extract-frames")
|
| async def extract_frames(session_id: str = Form(...), video_path: str = Form(...)):
|
|
|
| abs_video_path = Path(video_path).resolve()
|
| abs_output_dir = OUTPUT_DIR.resolve()
|
|
|
| if not str(abs_video_path).startswith(str(abs_output_dir)):
|
| raise HTTPException(status_code=403, detail="Accès au fichier non autorisé")
|
|
|
| try:
|
| frames = await analyzer.extract_frames_only(abs_video_path, session_id)
|
| return {"status": "success", "frames": frames}
|
| except Exception as e:
|
| raise HTTPException(status_code=500, detail=str(e))
|
|
|
| @app.get("/stream/{session_id}")
|
| async def stream_analysis(session_id: str, video_path: str, prompt: str = None):
|
|
|
| abs_video_path = Path(video_path).resolve()
|
| abs_output_dir = OUTPUT_DIR.resolve()
|
|
|
| if not str(abs_video_path).startswith(str(abs_output_dir)):
|
| raise HTTPException(status_code=403, detail="Accès au fichier non autorisé")
|
|
|
| async def event_generator():
|
| try:
|
| async for update in analyzer.run_full_analysis(abs_video_path, session_id, custom_prompt=prompt):
|
| yield f"data: {json.dumps(update)}\n\n"
|
| except Exception as e:
|
| yield f"data: {json.dumps({'error': str(e)})}\n\n"
|
|
|
| return StreamingResponse(event_generator(), media_type="text/event-stream")
|
|
|
|
|
| app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
|
|
|
|
|
| frontend_path = Path("frontend")
|
| if frontend_path.exists():
|
| app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
|
| else:
|
| @app.get("/")
|
| async def root():
|
| import os
|
| return {
|
| "status": "Zenith AI API is running",
|
| "frontend": "hosted externally",
|
| "diagnostics": {
|
| "deepseek_configured": "YES",
|
| "yolo_available": "YES" if analyzer.yolo else "NO",
|
| "whisper_available": "YES" if analyzer.audio_proc else "NO"
|
| }
|
| }
|
|
|
| if __name__ == "__main__":
|
| import uvicorn
|
| uvicorn.run(app, host="0.0.0.0", port=8000)
|
|
|