| import os |
| import subprocess |
| import datetime |
| import json |
| import time |
| import shutil |
| import uuid |
| import asyncio |
| import aiosqlite |
| from fastapi import FastAPI, UploadFile, File, BackgroundTasks, WebSocket, WebSocketDisconnect, Request, HTTPException |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.responses import FileResponse, JSONResponse, StreamingResponse |
| from faster_whisper import WhisperModel |
| import requests |
| from collections import deque |
| import psutil |
| try: |
| from dotenv import load_dotenv |
| |
| load_dotenv() |
| except ImportError: |
| print("DEBUG: python-dotenv non trouvé, utilisation des variables d'environnement système") |
|
|
| |
| UPLOAD_DIR = "uploads" |
| OUTPUT_DIR = "outputs" |
| STATIC_DIR = "static" |
| DB_PATH = "analytics.db" |
| os.makedirs(UPLOAD_DIR, exist_ok=True) |
| os.makedirs(OUTPUT_DIR, exist_ok=True) |
| os.makedirs(STATIC_DIR, exist_ok=True) |
|
|
| |
| MAX_UPLOAD_SIZE = 5 * 1024 * 1024 * 1024 |
| CHUNK_SIZE = 10 * 1024 * 1024 |
| MAX_CONCURRENT_TASKS = 3 |
| MAX_QUEUE_SIZE = 10 |
| MEMORY_THRESHOLD = 85 |
|
|
| |
| processing_queue = deque() |
| active_tasks = set() |
| queue_lock = asyncio.Lock() |
|
|
| |
| async def init_db(): |
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS visitors ( |
| session_id TEXT PRIMARY KEY, |
| start_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| last_activity TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS feedback ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| client_id TEXT, |
| rating TEXT, |
| comment TEXT, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP |
| ) |
| """) |
| await db.execute(""" |
| CREATE TABLE IF NOT EXISTS tasks ( |
| id INTEGER PRIMARY KEY AUTOINCREMENT, |
| file_id TEXT UNIQUE, |
| client_id TEXT, |
| filename TEXT, |
| status TEXT, |
| progress INTEGER DEFAULT 0, |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, |
| result_url TEXT |
| ) |
| """) |
| await db.commit() |
|
|
| |
| async def cleanup_old_files(): |
| """Supprime les fichiers de plus de 1 heure pour économiser l'espace disque""" |
| while True: |
| try: |
| now = time.time() |
| for folder in [UPLOAD_DIR, OUTPUT_DIR]: |
| if not os.path.exists(folder): continue |
| for filename in os.listdir(folder): |
| file_path = os.path.join(folder, filename) |
| if os.path.isfile(file_path): |
| if os.stat(file_path).st_mtime < now - 3600: |
| os.remove(file_path) |
| print(f"Cleanup: Suppression de {filename}") |
| except Exception as e: |
| print(f"Cleanup error: {e}") |
| await asyncio.sleep(600) |
|
|
| |
| async def check_system_resources(): |
| """Vérifie si le système a assez de ressources""" |
| memory = psutil.virtual_memory() |
| if memory.percent > MEMORY_THRESHOLD: |
| return False, f"Mémoire saturée ({memory.percent}%)" |
| return True, "OK" |
|
|
| async def process_queue(): |
| """Traite la file d'attente des tâches""" |
| while True: |
| try: |
| async with queue_lock: |
| |
| for idx, task_info in enumerate(processing_queue): |
| position = idx + 1 |
| client_id = task_info.get('client_id') |
| file_id = task_info.get('file_id') |
| if client_id and file_id: |
| await send_status( |
| client_id, |
| f"En file d'attente (Position: {position}/{len(processing_queue)})", |
| 5, |
| file_id |
| ) |
|
|
| |
| if len(active_tasks) < MAX_CONCURRENT_TASKS and processing_queue: |
| task_info = processing_queue.popleft() |
| active_tasks.add(task_info['file_id']) |
|
|
| |
| asyncio.create_task(execute_task(task_info)) |
| print(f"[QUEUE] Démarrage tâche {task_info['file_id']} ({len(active_tasks)}/{MAX_CONCURRENT_TASKS} actives)") |
| except Exception as e: |
| print(f"Queue error: {e}") |
|
|
| await asyncio.sleep(2) |
|
|
| async def execute_task(task_info): |
| """Exécute une tâche de traitement""" |
| file_id = task_info['file_id'] |
| client_id = task_info['client_id'] |
| try: |
| |
| await asyncio.sleep(1) |
|
|
| |
| await send_status(client_id, "Démarrage du traitement...", 10, file_id) |
| await update_task_db(file_id, "En cours", 10) |
|
|
| await run_hybrid_processing( |
| file_id, |
| client_id, |
| task_info['audio_path'] |
| ) |
| except Exception as e: |
| print(f"[TASK {file_id}] Erreur: {e}") |
| await send_status(client_id, "Erreur de traitement", 0, file_id) |
| await update_task_db(file_id, "Erreur", 0) |
| finally: |
| async with queue_lock: |
| active_tasks.discard(file_id) |
| print(f"[QUEUE] Tâche {file_id} terminée ({len(active_tasks)}/{MAX_CONCURRENT_TASKS} actives)") |
|
|
| from contextlib import asynccontextmanager |
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| await init_db() |
| cleanup_task = asyncio.create_task(cleanup_old_files()) |
| queue_task = asyncio.create_task(process_queue()) |
| yield |
| cleanup_task.cancel() |
| queue_task.cancel() |
|
|
| app = FastAPI(lifespan=lifespan) |
|
|
| async def track_visitor(session_id: str): |
| try: |
| if not session_id: return |
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute(""" |
| INSERT INTO visitors (session_id) VALUES (?) |
| ON CONFLICT(session_id) DO UPDATE SET last_activity = CURRENT_TIMESTAMP |
| """, (session_id,)) |
| await db.commit() |
| except Exception as e: |
| print(f"Error tracking visitor: {e}") |
|
|
| async def update_task_db(file_id: str, status: str, progress: int, client_id: str = None, filename: str = None, result_url: str = None): |
| try: |
| async with aiosqlite.connect(DB_PATH) as db: |
| if filename: |
| await db.execute(""" |
| INSERT INTO tasks (file_id, client_id, filename, status, progress) |
| VALUES (?, ?, ?, ?, ?) |
| """, (file_id, client_id, filename, status, progress)) |
| else: |
| if result_url: |
| await db.execute("UPDATE tasks SET status = ?, progress = ?, result_url = ? WHERE file_id = ?", (status, progress, result_url, file_id)) |
| else: |
| await db.execute("UPDATE tasks SET status = ?, progress = ? WHERE file_id = ?", (status, progress, file_id)) |
| await db.commit() |
| except Exception as e: |
| print(f"Error updating task DB: {e}") |
|
|
| |
| app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") |
| app.mount("/outputs", StaticFiles(directory=OUTPUT_DIR), name="outputs") |
|
|
| |
| |
| |
| hybrid_tasks = {} |
|
|
| async def run_hybrid_processing(file_id: str, client_id: str, audio_path: str): |
| try: |
| print(f"[{file_id}] Début du traitement hybride sur {audio_path}") |
| await send_status(client_id, "Analyse audio accélérée...", 20, file_id) |
|
|
| |
| print(f"[{file_id}] Lancement Whisper...") |
| def transcribe(): |
| segs, _ = model_whisper.transcribe(audio_path, word_timestamps=True, vad_filter=True, beam_size=1) |
| return list(segs) |
|
|
| segments = await asyncio.to_thread(transcribe) |
| raw_texts = [s.text.strip() for s in segments] |
| print(f"[{file_id}] Whisper terminé: {len(segments)} segments trouvés") |
|
|
| |
| await send_status(client_id, "Correction IA (DeepSeek)...", 45, file_id) |
| print(f"[{file_id}] Lancement DeepSeek...") |
| corrected_texts = await deepseek_client.generate_correction(raw_texts) |
| if corrected_texts is None: |
| print(f"[{file_id}] DeepSeek a échoué ou n'a renvoyé aucun texte") |
| corrected_texts = raw_texts |
| print(f"[{file_id}] DeepSeek terminé") |
|
|
| |
| if file_id not in hybrid_tasks: |
| hybrid_tasks[file_id] = {"segments": None, "corrected": None, "video_ready": asyncio.Event(), "video_path": None} |
|
|
| hybrid_tasks[file_id]["segments"] = segments |
| hybrid_tasks[file_id]["corrected"] = corrected_texts |
|
|
| await send_status(client_id, "IA terminée. Attente de la vidéo...", 65, file_id) |
| print(f"[{file_id}] Phase IA ok, attente du signal video_ready...") |
|
|
| |
| try: |
| await asyncio.wait_for(hybrid_tasks[file_id]["video_ready"].wait(), timeout=600) |
| except asyncio.TimeoutError: |
| print(f"[{file_id}] Timeout: La vidéo n'a pas été reçue à temps") |
| await send_status(client_id, "Erreur: Timeout upload vidéo", 0, file_id) |
| return |
|
|
| video_path = hybrid_tasks[file_id]["video_path"] |
| print(f"[{file_id}] Vidéo reçue à {video_path}, lancement de la finalisation...") |
|
|
| |
| await send_status(client_id, "Vidéo prête !", 100, file_id) |
|
|
| |
| stream_url = f"/stream/{file_id}" |
| if client_id in active_connections: |
| await active_connections[client_id].send_json({ |
| "status": "Prêt !", |
| "progress": 100, |
| "stream_url": stream_url, |
| "file_id": file_id |
| }) |
|
|
| |
| asyncio.create_task(finalize_video(file_id, client_id, video_path, segments, corrected_texts)) |
|
|
| except Exception as e: |
| import traceback |
| traceback.print_exc() |
| print(f"Erreur Hybrid [{file_id}]: {e}") |
| await send_status(client_id, "Erreur de traitement", 0, file_id) |
| finally: |
| |
| if file_id in hybrid_tasks: del hybrid_tasks[file_id] |
| if os.path.exists(audio_path): |
| try: os.remove(audio_path) |
| except: pass |
|
|
| async def finalize_video(file_id: str, client_id: str, video_path: str, segments, corrected_texts): |
| try: |
| output_video = os.path.join(OUTPUT_DIR, f"{file_id}_final.mp4") |
| ass_path = os.path.join(UPLOAD_DIR, f"{file_id}.ass") |
|
|
| await send_status(client_id, "Génération des styles...", 75, file_id) |
|
|
| |
| header = "[Script Info]\nScriptType: v4.00+\nPlayResX: 384\nPlayResY: 288\n\n[V4+ Styles]\nFormat: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\nStyle: Premium,Arial,18,&H0000FFFF,&H00FFFFFF,&H00000000,&H00000000,-1,0,0,0,100,100,0,0,1,2,0,2,10,10,50,1\n\n[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" |
|
|
| def write_ass(): |
| with open(ass_path, "w", encoding="utf-8") as f: |
| f.write(header) |
| for seg_orig, text_corr in zip(segments, corrected_texts): |
| words = text_corr.split() |
| if not words: continue |
| dur = (seg_orig.end - seg_orig.start) / max(1, len(words)) |
| for i in range(0, len(words), 3): |
| chunk = words[i:i+3] |
| c_start = seg_orig.start + (i * dur) |
| c_end = c_start + (len(chunk) * dur) |
| line = "".join([f"{{\\k{max(1, int(dur*100))}}}{w.upper()} " for w in chunk]) |
| f.write(f"Dialogue: 0,{format_ass_time(c_start)},{format_ass_time(c_end)},Premium,,0,0,0,,{line.strip()}\n") |
|
|
| await asyncio.to_thread(write_ass) |
|
|
| await send_status(client_id, "Incrustation finale...", 90, file_id) |
|
|
| |
| cmd = ["ffmpeg", "-y", "-i", video_path, "-vf", f"ass={ass_path}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "22", "-c:a", "copy", "-threads", "0", output_video] |
|
|
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
|
|
| stdout, stderr = await process.communicate() |
|
|
| if process.returncode != 0: |
| print(f"FFmpeg error: {stderr.decode()}") |
| raise Exception("FFmpeg failed") |
|
|
| result_url = f"/outputs/{file_id}_final.mp4" |
| await send_status(client_id, "Prêt !", 100, file_id) |
| await update_task_db(file_id, "Terminé", 100, result_url=result_url) |
|
|
| if client_id in active_connections: |
| await active_connections[client_id].send_json({"result_url": result_url}) |
|
|
| except Exception as e: |
| print(f"Erreur Finalize: {e}") |
| await send_status(client_id, "Erreur lors de l'incrustation", 0, file_id) |
|
|
| |
| DEEPSEEK_API_URL = "https://ds2api-tau-woad.vercel.app/v1/chat/completions" |
| DEEPSEEK_API_KEY = "sk-ds2api-key-1-your-custom-key" |
| DEEPSEEK_MODEL = "deepseek-chat" |
|
|
| class DeepSeekClient: |
| def __init__(self): |
| self.api_url = DEEPSEEK_API_URL |
| self.api_key = DEEPSEEK_API_KEY |
| self.model = DEEPSEEK_MODEL |
|
|
| async def generate_correction(self, segments_text): |
| if not self.api_key: |
| print("DEBUG DEEPSEEK: API Key manquante") |
| return None |
|
|
| full_context = " ".join(segments_text) |
| |
| |
| prompt = ( |
| "Tu es un expert en post-production vidéo et correction de transcriptions (ASR).\n\n" |
| "=== RÈGLE N°1 - SYNCHRO APPROXIMATIVE ===\n" |
| "Chaque mot corrigé doit avoir une durée de prononciation PROCHE du mot d'origine.\n" |
| "Tolérance autorisée : ±1 syllabe par mot. Si l'écart est plus grand, laisse l'original.\n\n" |
| "=== RÈGLE N°2 - AJOUTS ET SUPPRESSIONS LIMITÉS ===\n" |
| "Tu peux AJOUTER ou SUPPRIMER jusqu'à 2 mots par segment si nécessaire.\n" |
| "Exceptions :\n" |
| "- Tu peux ajouter 'ne' devant un verbe pour compléter une négation ('ne...pas')\n" |
| "- Tu peux supprimer un mot parasite évident (ex: 'de' en trop, 'que' en trop)\n" |
| "- Tu peux remplacer un mot inventé par le mot correct (même si longueur différente)\n\n" |
| "=== RÈGLE N°3 - RÉÉCRITURE DE SEGMENT HALLUCINÉ ===\n" |
| "Si dans un segment, PLUS DE 50% des mots sont des hallucinations ASR :\n" |
| "- mots inexistants (ex: 'poisonnerait')\n" |
| "- mots hors contexte total\n" |
| "- séquence grammaticalement impossible\n" |
| "ALORS tu peux RÉÉCRIRE LE SEGMENT ENTIÈREMENT en respectant le sens et la durée approximative.\n\n" |
| "=== RÈGLE N°4 - COHÉRENCE GRAMMATICALE PRIORITAIRE ===\n" |
| "Ces règles sont prioritaires sur le respect mot à mot :\n" |
| "1. Sujet et verbe doivent s'accorder\n" |
| "2. Les temps doivent être cohérents sur l'ensemble du texte\n" |
| "3. Les pronoms (je/tu/il/on/nous/vous/ils) doivent être stables\n" |
| "4. Les négations doivent être complètes ('ne...pas', 'ne...plus')\n\n" |
| "=== RÈGLE N°5 - INTERDICTION D'HALLUCINER ===\n" |
| "Si un mot est inaudible → garde-le tel quel.\n" |
| "Si une correction est incertaine → garde l'original.\n" |
| "Ne crée JAMAIS un mot qui n'existe pas dans la langue.\n" |
| "Ne reformule PAS une phrase entière SAUF si la règle N°3 s'applique.\n\n" |
| "=== CONTEXTE GLOBAL DE LA VIDÉO ===\n" |
| f"\"{full_context}\"\n\n" |
| "=== LISTE DES SEGMENTS À CORRIGER ===\n" |
| f"{json.dumps(segments_text, ensure_ascii=False)}\n\n" |
| "=== FORMAT DE RÉPONSE OBLIGATOIRE ===\n" |
| "Renvoie UNIQUEMENT un JSON valide comme ceci :\n" |
| "{\"corrected\": [\"segment1 corrigé\", \"segment2 corrigé\", ...]}\n\n" |
| "=== RAPPEL FINAL ===\n" |
| "Tu as le DROIT de :\n" |
| "- ajouter/supprimer jusqu'à 2 mots par segment\n" |
| "- réécrire entièrement un segment si plus de 50% est halluciné\n" |
| "- modifier la longueur syllabique (tolérance ±1 syllabe)\n" |
| "Tu ne dois PAS :\n" |
| "- changer le nombre de segments\n" |
| "- inventer des mots inexistants\n" |
| "- inclure du texte en dehors du JSON" |
| ) |
|
|
| payload = { |
| "model": self.model, |
| "messages": [ |
| {"role": "system", "content": "Tu es un assistant expert en correction de transcriptions. Tu réponds UNIQUEMENT en JSON valide. Tu as le droit d'ajouter/supprimer jusqu'à 2 mots par segment et de réécrire les segments hallucinés."}, |
| {"role": "user", "content": prompt} |
| ], |
| "temperature": 0.2, |
| "max_tokens": 4000 |
| } |
|
|
| headers = { |
| "Authorization": f"Bearer {self.api_key}", |
| "Content-Type": "application/json" |
| } |
|
|
| try: |
| print(f"DEBUG DEEPSEEK: Envoi requête correction ({len(segments_text)} segments)...") |
| resp = requests.post(self.api_url, headers=headers, json=payload, timeout=90) |
| if resp.status_code == 200: |
| data = resp.json() |
| raw_text = data['choices'][0]['message']['content'] |
| clean_json = raw_text.replace('```json', '').replace('```', '').strip() |
| result = json.loads(clean_json).get("corrected") |
| |
| |
| if result and len(result) != len(segments_text): |
| print(f"ERREUR: Nombre de segments incorrect (attendu {len(segments_text)}, reçu {len(result)})") |
| return None |
| |
| print(f"DEBUG DEEPSEEK: Correction réussie ({len(result) if result else 0} segments reçus)") |
| return result |
| else: |
| print(f"DEBUG DEEPSEEK: Échec correction ({resp.status_code}): {resp.text}") |
| except Exception as e: |
| print(f"DEBUG DEEPSEEK: Exception lors de la correction: {e}") |
| return None |
|
|
| @app.get("/stream/{file_id}") |
| async def stream_video(file_id: str): |
| |
| |
| |
| video_files = [f for f in os.listdir(UPLOAD_DIR) if f.startswith(file_id + "_") and not f.endswith(".ass") and not f.endswith(".wav")] |
| if not video_files: |
| return JSONResponse(status_code=404, content={"error": "Video not found"}) |
|
|
| video_path = os.path.join(UPLOAD_DIR, video_files[0]) |
| ass_path = os.path.join(UPLOAD_DIR, f"{file_id}.ass") |
|
|
| if not os.path.exists(ass_path): |
| return JSONResponse(status_code=404, content={"error": "Subtitles not ready"}) |
|
|
| async def video_generator(): |
| cmd = [ |
| "ffmpeg", "-y", "-i", video_path, |
| "-vf", f"ass={ass_path}", |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "22", |
| "-c:a", "copy", |
| "-f", "mp4", |
| "-movflags", "frag_keyframe+empty_moov+default_base_moof", |
| "pipe:1" |
| ] |
|
|
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
|
|
| try: |
| while True: |
| data = await process.stdout.read(1024 * 64) |
| if not data: |
| break |
| yield data |
| except Exception as e: |
| print(f"Streaming error: {e}") |
| finally: |
| try: |
| process.terminate() |
| except: |
| pass |
|
|
| return StreamingResponse(video_generator(), media_type="video/mp4") |
|
|
| |
| @app.post("/api/feedback") |
| async def post_feedback(request: Request): |
| try: |
| data = await request.json() |
| client_id = data.get("client_id") |
| rating = data.get("rating") |
| comment = data.get("comment") |
|
|
| async with aiosqlite.connect(DB_PATH) as db: |
| await db.execute(""" |
| INSERT INTO feedback (client_id, rating, comment) |
| VALUES (?, ?, ?) |
| """, (client_id, rating, comment)) |
| await db.commit() |
| return {"status": "success"} |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| @app.get("/api/feedbacks") |
| async def get_feedbacks(): |
| try: |
| async with aiosqlite.connect(DB_PATH) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute("SELECT * FROM feedback ORDER BY created_at DESC") |
| rows = await cursor.fetchall() |
| return [dict(row) for row in rows] |
| except Exception as e: |
| return JSONResponse(status_code=500, content={"error": str(e)}) |
|
|
| deepseek_client = DeepSeekClient() |
| |
| model_whisper = WhisperModel("base", device="cpu", compute_type="int8", cpu_threads=4) |
|
|
| active_connections = {} |
|
|
| @app.websocket("/ws/{client_id}") |
| async def websocket_endpoint(websocket: WebSocket, client_id: str): |
| await websocket.accept() |
| await track_visitor(client_id) |
| active_connections[client_id] = websocket |
| try: |
| while True: |
| data = await websocket.receive_text() |
| await track_visitor(client_id) |
| except WebSocketDisconnect: |
| if client_id in active_connections: del active_connections[client_id] |
|
|
| async def send_status(client_id: str, status: str, progress: int, file_id: str = None): |
| if file_id: |
| await update_task_db(file_id, status, progress) |
|
|
| if client_id in active_connections: |
| try: |
| await active_connections[client_id].send_json({ |
| "status": status, |
| "progress": progress, |
| "file_id": file_id |
| }) |
| print(f"[WS] Envoyé à {client_id}: {status} ({progress}%)") |
| except Exception as e: |
| print(f"[WS] Erreur envoi à {client_id}: {e}") |
| else: |
| print(f"[WS] Client {client_id} non connecté, message ignoré: {status}") |
|
|
| def format_ass_time(seconds: float): |
| td = datetime.timedelta(seconds=seconds) |
| total_sec = int(td.total_seconds()) |
| h, m, s = total_sec // 3600, (total_sec % 3600) // 60, total_sec % 60 |
| cs = int((seconds - int(seconds)) * 100) |
| return f"{h}:{m:02d}:{s:02d}.{cs:02d}" |
|
|
| async def run_processing(video_path: str, client_id: str, file_id: str): |
| try: |
| output_video = os.path.join(OUTPUT_DIR, f"{file_id}_final.mp4") |
| ass_path = os.path.join(UPLOAD_DIR, f"{file_id}.ass") |
|
|
| await send_status(client_id, "Analyse audio...", 20, file_id) |
| segments, _ = model_whisper.transcribe(video_path, word_timestamps=True, vad_filter=True, beam_size=1) |
| segments = list(segments) |
| raw_texts = [s.text.strip() for s in segments] |
|
|
| await send_status(client_id, "Correction IA (DeepSeek)...", 45, file_id) |
| corrected_texts = await deepseek_client.generate_correction(raw_texts) |
| if corrected_texts is None: corrected_texts = raw_texts |
|
|
| await send_status(client_id, "Génération des styles...", 70, file_id) |
| |
| header = "[Script Info]\nScriptType: v4.00+\nPlayResX: 384\nPlayResY: 288\n\n[V4+ Styles]\nFormat: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding\nStyle: Premium,Arial,18,&H0000FFFF,&H00FFFFFF,&H00000000,&H00000000,-1,0,0,0,100,100,0,0,1,2,0,2,10,10,50,1\n\n[Events]\nFormat: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text\n" |
|
|
| with open(ass_path, "w", encoding="utf-8") as f: |
| f.write(header) |
| for seg_orig, text_corr in zip(segments, corrected_texts): |
| words = text_corr.split() |
| if not words: continue |
| dur = (seg_orig.end - seg_orig.start) / max(1, len(words)) |
| for i in range(0, len(words), 3): |
| chunk = words[i:i+3] |
| c_start = seg_orig.start + (i * dur) |
| c_end = c_start + (len(chunk) * dur) |
| |
| line = "".join([f"{{\\k{max(1, int(dur*100))}}}{w.upper()} " for w in chunk]) |
| f.write(f"Dialogue: 0,{format_ass_time(c_start)},{format_ass_time(c_end)},Premium,,0,0,0,,{line.strip()}\n") |
|
|
| await send_status(client_id, "Finalisation vidéo...", 85, file_id) |
| |
| subprocess.run(["ffmpeg", "-y", "-i", video_path, "-vf", f"ass={ass_path}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "22", "-c:a", "copy", "-threads", "0", output_video], check=True) |
|
|
| result_url = f"/outputs/{file_id}_final.mp4" |
| await send_status(client_id, "Prêt !", 100, file_id) |
| await update_task_db(file_id, "Terminé", 100, result_url=result_url) |
|
|
| if client_id in active_connections: |
| await active_connections[client_id].send_json({"result_url": result_url}) |
|
|
| except Exception as e: |
| print(f"Erreur : {e}") |
| await send_status(client_id, f"Erreur de traitement", 0, file_id) |
| await update_task_db(file_id, "Erreur", 0) |
|
|
| @app.get("/") |
| async def read_index(): |
| return FileResponse(os.path.join(STATIC_DIR, "index.html")) |
|
|
| import re |
|
|
| def sanitize_filename(filename: str) -> str: |
| |
| return re.sub(r'[^a-zA-Z0-9._-]', '_', filename) |
|
|
| @app.post("/upload") |
| async def upload_video(background_tasks: BackgroundTasks, file: UploadFile = File(...), client_id: str = None, file_id: str = None, type: str = "video"): |
| |
| resources_ok, msg = await check_system_resources() |
| if not resources_ok: |
| raise HTTPException(status_code=503, detail=f"Serveur surchargé: {msg}") |
|
|
| |
| async with queue_lock: |
| queue_size = len(processing_queue) + len(active_tasks) |
| if queue_size >= MAX_QUEUE_SIZE: |
| raise HTTPException(status_code=429, detail=f"Trop de requêtes en cours. Réessayez dans quelques instants. ({queue_size}/{MAX_QUEUE_SIZE})") |
|
|
| |
| safe_file_id = str(uuid.uuid4()) |
| if file_id and re.match(r'^[a-f0-9-]{36}$', file_id): |
| safe_file_id = file_id |
|
|
| safe_filename = sanitize_filename(file.filename) |
|
|
| if type == "audio": |
| |
| audio_path = os.path.join(UPLOAD_DIR, f"{safe_file_id}_temp.wav") |
|
|
| |
| total_size = 0 |
| try: |
| async with asyncio.Lock(): |
| with open(audio_path, "wb") as buffer: |
| while chunk := await file.read(CHUNK_SIZE): |
| total_size += len(chunk) |
| if total_size > MAX_UPLOAD_SIZE: |
| os.remove(audio_path) |
| raise HTTPException(status_code=413, detail=f"Fichier trop volumineux (max {MAX_UPLOAD_SIZE // (1024**3)}GB)") |
| buffer.write(chunk) |
| await asyncio.sleep(0) |
| except Exception as e: |
| if os.path.exists(audio_path): |
| os.remove(audio_path) |
| raise HTTPException(status_code=500, detail=f"Erreur upload: {str(e)}") |
|
|
| |
| await update_task_db(safe_file_id, "En attente", 5, client_id, safe_filename) |
| if client_id: await track_visitor(client_id) |
|
|
| |
| async with queue_lock: |
| processing_queue.append({ |
| 'file_id': safe_file_id, |
| 'client_id': client_id, |
| 'audio_path': audio_path |
| }) |
| queue_position = len(processing_queue) |
|
|
| print(f"[QUEUE] Tâche {safe_file_id} ajoutée (position: {queue_position})") |
| return {"file_id": safe_file_id, "status": "audio_received", "queue_position": queue_position} |
|
|
| else: |
| |
| video_path = os.path.join(UPLOAD_DIR, f"{safe_file_id}_{safe_filename}") |
|
|
| total_size = 0 |
| try: |
| async with asyncio.Lock(): |
| with open(video_path, "wb") as buffer: |
| while chunk := await file.read(CHUNK_SIZE): |
| total_size += len(chunk) |
| if total_size > MAX_UPLOAD_SIZE: |
| os.remove(video_path) |
| raise HTTPException(status_code=413, detail=f"Fichier trop volumineux (max {MAX_UPLOAD_SIZE // (1024**3)}GB)") |
| buffer.write(chunk) |
| await asyncio.sleep(0) |
| except Exception as e: |
| if os.path.exists(video_path): |
| os.remove(video_path) |
| raise HTTPException(status_code=500, detail=f"Erreur upload: {str(e)}") |
|
|
| |
| if safe_file_id not in hybrid_tasks: |
| hybrid_tasks[safe_file_id] = {"segments": None, "corrected": None, "video_ready": asyncio.Event(), "video_path": None} |
|
|
| hybrid_tasks[safe_file_id]["video_path"] = video_path |
| hybrid_tasks[safe_file_id]["video_ready"].set() |
|
|
| return {"file_id": safe_file_id, "status": "video_received"} |
|
|
| |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD", "admin123") |
|
|
| @app.get("/dashboard") |
| async def get_dashboard(password: str = None): |
| if password != ADMIN_PASSWORD: |
| return JSONResponse(status_code=403, content={"error": "Accès refusé. Ajoutez ?password=VOTRE_MOT_DE_PASSE à l'URL"}) |
| return FileResponse(os.path.join(STATIC_DIR, "dashboard.html")) |
|
|
| @app.get("/api/stats") |
| async def get_stats(password: str = None): |
| if password != ADMIN_PASSWORD: |
| return JSONResponse(status_code=403, content={"error": "Accès refusé"}) |
| try: |
| async with aiosqlite.connect(DB_PATH) as db: |
| db.row_factory = aiosqlite.Row |
|
|
| cursor = await db.execute("SELECT COUNT(*) as count FROM visitors") |
| total_visitors = (await cursor.fetchone())['count'] |
|
|
| cursor = await db.execute("SELECT COUNT(*) as count FROM tasks") |
| total_tasks = (await cursor.fetchone())['count'] |
|
|
| cursor = await db.execute("SELECT COUNT(*) as count FROM tasks WHERE status = 'Terminé'") |
| success_tasks = (await cursor.fetchone())['count'] |
|
|
| cursor = await db.execute("SELECT AVG(strftime('%s', last_activity) - strftime('%s', start_time)) as avg_dur FROM visitors") |
| avg_duration = (await cursor.fetchone())['avg_dur'] or 0 |
|
|
| return { |
| "active_users": len(active_connections), |
| "total_visitors": total_visitors, |
| "total_tasks": total_tasks, |
| "success_rate": round((success_tasks / max(1, total_tasks) * 100), 1), |
| "avg_session_seconds": round(avg_duration), |
| "queue_size": len(processing_queue), |
| "active_tasks": len(active_tasks), |
| "memory_usage": psutil.virtual_memory().percent |
| } |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @app.get("/api/history") |
| async def get_history(password: str = None): |
| if password != ADMIN_PASSWORD: |
| return JSONResponse(status_code=403, content={"error": "Accès refusé"}) |
| try: |
| async with aiosqlite.connect(DB_PATH) as db: |
| db.row_factory = aiosqlite.Row |
| cursor = await db.execute("SELECT * FROM tasks ORDER BY created_at DESC LIMIT 50") |
| rows = await cursor.fetchall() |
| return [dict(row) for row in rows] |
| except Exception as e: |
| return {"error": str(e)} |
|
|
| @app.get("/api/queue-status") |
| async def get_queue_status(client_id: str = None): |
| """Endpoint pour vérifier le statut de la file d'attente""" |
| async with queue_lock: |
| queue_info = { |
| "queue_size": len(processing_queue), |
| "active_tasks": len(active_tasks), |
| "max_concurrent": MAX_CONCURRENT_TASKS, |
| "max_queue": MAX_QUEUE_SIZE, |
| "available_slots": MAX_QUEUE_SIZE - (len(processing_queue) + len(active_tasks)), |
| "memory_usage": psutil.virtual_memory().percent, |
| "system_ready": psutil.virtual_memory().percent < MEMORY_THRESHOLD |
| } |
|
|
| |
| if client_id: |
| position = None |
| for idx, task in enumerate(processing_queue): |
| if task['client_id'] == client_id: |
| position = idx + 1 |
| break |
| queue_info["your_position"] = position |
| queue_info["is_processing"] = any(task['client_id'] == client_id for task in processing_queue if task['file_id'] in active_tasks) |
|
|
| return queue_info |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |