import os, uuid, json, asyncio, time from fastapi import FastAPI, UploadFile, File, Form, BackgroundTasks, HTTPException from fastapi.responses import StreamingResponse from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from pathlib import Path from backend.core.engine import ZenithAnalyzer, VideoDownloader, OUTPUT_DIR app = FastAPI(title="Zenith AI API") # Configuration CORS étendue pour permettre au frontend hébergé ailleurs de communiquer avec l'API app.add_middleware( CORSMiddleware, allow_origins=["*"], # Vous pourrez remplacer "*" par l'URL de votre site Vercel plus tard pour plus de sécurité allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) analyzer = ZenithAnalyzer() def cleanup_old_sessions(max_age_hours=1): """Supprime les dossiers de session plus vieux que max_age_hours.""" try: import shutil now = time.time() for path in OUTPUT_DIR.glob("session_*"): if path.is_dir(): # Vérifier l'âge du dossier if (now - path.stat().st_mtime) > (max_age_hours * 3600): shutil.rmtree(path) except Exception as e: print(f"Erreur lors du nettoyage : {e}") @app.post("/analyze/url") async def analyze_url(url: str = Form(...), background_tasks: BackgroundTasks = BackgroundTasks()): background_tasks.add_task(cleanup_old_sessions) session_id = str(uuid.uuid4()) session_dir = OUTPUT_DIR / f"session_{session_id}" session_dir.mkdir(parents=True, exist_ok=True) video_path = VideoDownloader.download(url, session_dir) if not video_path: raise HTTPException(status_code=400, detail="Échec du téléchargement de la vidéo") # Retourner l'URL relative pour le frontend video_url = f"/output/session_{session_id}/{video_path.name}" return {"session_id": session_id, "video_path": str(video_path), "video_url": video_url} @app.post("/analyze/upload") async def analyze_upload(file: UploadFile = File(...), background_tasks: BackgroundTasks = BackgroundTasks()): background_tasks.add_task(cleanup_old_sessions) session_id = str(uuid.uuid4()) session_dir = OUTPUT_DIR / f"session_{session_id}" session_dir.mkdir(parents=True, exist_ok=True) file_path = session_dir / file.filename with open(file_path, "wb") as buffer: buffer.write(await file.read()) # Retourner l'URL relative pour le frontend video_url = f"/output/session_{session_id}/{file.filename}" return {"session_id": session_id, "video_path": str(file_path), "video_url": video_url} @app.post("/analyze/extract-frames") async def extract_frames(session_id: str = Form(...), video_path: str = Form(...)): # Sécurité : Vérifier que le chemin du fichier est bien dans le dossier autorisé abs_video_path = Path(video_path).resolve() abs_output_dir = OUTPUT_DIR.resolve() if not str(abs_video_path).startswith(str(abs_output_dir)): raise HTTPException(status_code=403, detail="Accès au fichier non autorisé") try: frames = await analyzer.extract_frames_only(abs_video_path, session_id) return {"status": "success", "frames": frames} except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/stream/{session_id}") async def stream_analysis(session_id: str, video_path: str, prompt: str = None): # Sécurité : Vérifier que le chemin du fichier est bien dans le dossier autorisé abs_video_path = Path(video_path).resolve() abs_output_dir = OUTPUT_DIR.resolve() if not str(abs_video_path).startswith(str(abs_output_dir)): raise HTTPException(status_code=403, detail="Accès au fichier non autorisé") async def event_generator(): try: async for update in analyzer.run_full_analysis(abs_video_path, session_id, custom_prompt=prompt): yield f"data: {json.dumps(update)}\n\n" except Exception as e: yield f"data: {json.dumps({'error': str(e)})}\n\n" return StreamingResponse(event_generator(), media_type="text/event-stream") # Servir le dossier de sortie pour les images extraites app.mount("/output", StaticFiles(directory=str(OUTPUT_DIR)), name="output") # Servir le frontend statique uniquement s'il existe frontend_path = Path("frontend") if frontend_path.exists(): app.mount("/", StaticFiles(directory="frontend", html=True), name="frontend") else: @app.get("/") async def root(): import os return { "status": "Zenith AI API is running", "frontend": "hosted externally", "diagnostics": { "deepseek_configured": "YES", "yolo_available": "YES" if analyzer.yolo else "NO", "whisper_available": "YES" if analyzer.audio_proc else "NO" } } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)