Zenith-AI / backend /main.py
Shads229's picture
Upload 16 files
6d18217 verified
import os, uuid, json, asyncio, time
from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, HTTPException
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pathlib import Path
from backend.core.engine import ZenithAnalyzer, VideoDownloader, OUTPUT_DIR
app = FastAPI(title="Zenith AI API")
# Configuration CORS étendue pour permettre au frontend hébergé ailleurs de communiquer avec l'API
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Vous pourrez remplacer "*" par l'URL de votre site Vercel plus tard pour plus de sécurité
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
analyzer = ZenithAnalyzer()
def cleanup_old_sessions(max_age_hours=1):
"""Supprime les dossiers de session plus vieux que max_age_hours."""
try:
import shutil
now = time.time()
for path in OUTPUT_DIR.glob("session_*"):
if path.is_dir():
# Vérifier l'âge du dossier
if (now - path.stat().st_mtime) > (max_age_hours * 3600):
shutil.rmtree(path)
except Exception as e:
print(f"Erreur lors du nettoyage : {e}")
@app.post("/analyze/url")
async def analyze_url(url: str = Form(...), background_tasks: BackgroundTasks = BackgroundTasks()):
background_tasks.add_task(cleanup_old_sessions)
session_id = str(uuid.uuid4())
session_dir = OUTPUT_DIR / f"session_{session_id}"
session_dir.mkdir(parents=True, exist_ok=True)
video_path = VideoDownloader.download(url, session_dir)
if not video_path:
raise HTTPException(status_code=400, detail="Échec du téléchargement de la vidéo")
# Retourner l'URL relative pour le frontend
video_url = f"/output/session_{session_id}/{video_path.name}"
return {"session_id": session_id, "video_path": str(video_path), "video_url": video_url}
@app.post("/analyze/upload")
async def analyze_upload(file: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks()):
background_tasks.add_task(cleanup_old_sessions)
session_id = str(uuid.uuid4())
session_dir = OUTPUT_DIR / f"session_{session_id}"
session_dir.mkdir(parents=True, exist_ok=True)
file_path = session_dir / file.filename
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
# Retourner l'URL relative pour le frontend
video_url = f"/output/session_{session_id}/{file.filename}"
return {"session_id": session_id, "video_path": str(file_path), "video_url": video_url}
@app.post("/analyze/extract-frames")
async def extract_frames(session_id: str = Form(...), video_path: str = Form(...)):
# Sécurité : Vérifier que le chemin du fichier est bien dans le dossier autorisé
abs_video_path = Path(video_path).resolve()
abs_output_dir = OUTPUT_DIR.resolve()
if not str(abs_video_path).startswith(str(abs_output_dir)):
raise HTTPException(status_code=403, detail="Accès au fichier non autorisé")
try:
frames = await analyzer.extract_frames_only(abs_video_path, session_id)
return {"status": "success", "frames": frames}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/stream/{session_id}")
async def stream_analysis(session_id: str, video_path: str, prompt: str = None):
# Sécurité : Vérifier que le chemin du fichier est bien dans le dossier autorisé
abs_video_path = Path(video_path).resolve()
abs_output_dir = OUTPUT_DIR.resolve()
if not str(abs_video_path).startswith(str(abs_output_dir)):
raise HTTPException(status_code=403, detail="Accès au fichier non autorisé")
async def event_generator():
try:
async for update in analyzer.run_full_analysis(abs_video_path, session_id, custom_prompt=prompt):
yield f"data: {json.dumps(update)}\n\n"
except Exception as e:
yield f"data: {json.dumps({'error': str(e)})}\n\n"
return StreamingResponse(event_generator(), media_type="text/event-stream")
# Servir le dossier de sortie pour les images extraites
app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output")
# Servir le frontend statique uniquement s'il existe
frontend_path = Path("frontend")
if frontend_path.exists():
app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend")
else:
@app.get("/")
async def root():
import os
return {
"status": "Zenith AI API is running",
"frontend": "hosted externally",
"diagnostics": {
"deepseek_configured": "YES",
"yolo_available": "YES" if analyzer.yolo else "NO",
"whisper_available": "YES" if analyzer.audio_proc else "NO"
}
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)