import os import shutil import subprocess import uuid from datetime import timedelta from typing import List from fastapi import FastAPI, UploadFile, File from fastapi.responses import FileResponse, JSONResponse from fastapi.middleware.cors import CORSMiddleware from faster_whisper import WhisperModel from pydantic import BaseModel app = FastAPI(title="AI Subtitle Studio Pro") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) TEMP_DIR = "temp" os.makedirs(TEMP_DIR, exist_ok=True) print("⚡ [SYSTEM] Initializing AI Neural Network...") model = WhisperModel("small", device="cpu", compute_type="int8") print("✅ [SYSTEM] AI Core Ready.") # --- MODELS --- class SubtitleSegment(BaseModel): id: int start: float end: float text: str class StyleConfig(BaseModel): font: str fontSize: int primaryColor: str outlineColor: str backType: str outlineWidth: float marginV: int alignment: int class ProcessRequest(BaseModel): file_id: str segments: List[SubtitleSegment] style: StyleConfig # --- HELPERS --- def hex_to_ass(hex_color, alpha="00"): hex_color = hex_color.lstrip('#') if len(hex_color) != 6: return "&H00FFFFFF" r, g, b = hex_color[0:2], hex_color[2:4], hex_color[4:6] return f"&H{alpha}{b}{g}{r}" def format_time_ass(seconds: float): td = timedelta(seconds=seconds) total_seconds = int(td.total_seconds()) hours = total_seconds // 3600 minutes = (total_seconds % 3600) // 60 secs = total_seconds % 60 centisecs = int(td.microseconds / 10000) return f"{hours:01d}:{minutes:02d}:{secs:02d}.{centisecs:02d}" def generate_ass_file(data: ProcessRequest, output_path: str): s = data.style font_map = { "vazir": "Vazirmatn", "lalezar": "Lalezar", "roboto": "Roboto", "bangers": "Bangers" } font_name = font_map.get(s.font, "Arial") primary = hex_to_ass(s.primaryColor) outline = hex_to_ass(s.outlineColor) border_style = 1 back_color = "&H00000000" if s.backType == 'solid': border_style = 3 outline = hex_to_ass(s.outlineColor, "00") elif s.backType == 'transparent': border_style = 3 outline = "&H80000000" else: border_style = 1 header = f"""[Script Info] ScriptType: v4.00+ PlayResX: 1080 PlayResY: 1920 WrapStyle: 1 ScaledBorderAndShadow: yes [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Default,{font_name},{s.fontSize},{primary},&H000000FF,{outline},{back_color},1,0,0,0,100,100,0,0,{border_style},{s.outlineWidth},0,{s.alignment},10,10,{s.marginV},1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ with open(output_path, "w", encoding="utf-8") as f: f.write(header) for seg in data.segments: start = format_time_ass(seg.start) end = format_time_ass(seg.end) clean_text = seg.text.strip().replace("\n", "\\N") f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{clean_text}\n") # --- ROUTES --- @app.get("/") async def home(): return FileResponse("index.html") @app.post("/api/analyze") async def analyze_media(file: UploadFile = File(...)): try: file_id = str(uuid.uuid4())[:12] file_ext = file.filename.split('.')[-1] input_path = f"{TEMP_DIR}/{file_id}.{file_ext}" with open(input_path, "wb") as f: shutil.copyfileobj(file.file, f) segments_gen, _ = model.transcribe(input_path, language="fa", beam_size=5) results = [] for idx, seg in enumerate(segments_gen): results.append({ "id": idx, "start": seg.start, "end": seg.end, "text": seg.text.strip() }) return {"status": "success", "file_id": file_id, "segments": results} except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) @app.post("/api/render") async def render_video(data: ProcessRequest): try: exts = ["mp4", "mov", "avi", "mkv", "mp3"] input_path = None for ext in exts: test_path = f"{TEMP_DIR}/{data.file_id}.{ext}" if os.path.exists(test_path): input_path = test_path break if not input_path: return JSONResponse(status_code=404, content={"error": "Source file lost"}) ass_path = f"{TEMP_DIR}/{data.file_id}.ass" output_path = f"{TEMP_DIR}/{data.file_id}_final.mp4" generate_ass_file(data, ass_path) cmd = [ "ffmpeg", "-y", "-i", input_path, "-vf", f"ass={ass_path}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", "-c:a", "aac", output_path ] subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return {"status": "done", "url": f"/download/{data.file_id}_final.mp4"} except Exception as e: return JSONResponse(status_code=500, content={"error": str(e)}) @app.get("/download/{filename}") async def get_file(filename: str): path = f"{TEMP_DIR}/{filename}" if os.path.exists(path): return FileResponse(path) return JSONResponse(status_code=404, content={"error": "File missing"})