| import os |
| import shutil |
| import subprocess |
| import uuid |
| import json |
| import time |
| from datetime import timedelta |
| from typing import List, Optional |
| from fastapi import FastAPI, UploadFile, File, HTTPException |
| from fastapi.responses import FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| import google.generativeai as genai |
| from pydantic import BaseModel |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| TEMP_DIR = "temp" |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| app.mount("/temp", StaticFiles(directory="temp"), name="temp") |
|
|
| |
| MODEL_NAME = "gemini-2.5-pro" |
|
|
| |
|
|
| class WordInfo(BaseModel): |
| word: str |
| start: float |
| end: float |
|
|
| class SubtitleSegment(BaseModel): |
| id: int |
| start: float |
| end: float |
| text: str |
| words: Optional[List[WordInfo]] = [] |
|
|
| class StyleConfig(BaseModel): |
| font: str |
| fontSize: int |
| primaryColor: str |
| outlineColor: str |
| backType: str |
| marginV: int |
| x: Optional[int] = 0 |
| name: Optional[str] = "classic" |
|
|
| class ProcessRequest(BaseModel): |
| file_id: str |
| segments: List[SubtitleSegment] |
| video_width: int |
| video_height: int |
| style: StyleConfig |
|
|
| |
|
|
| def get_video_info(path): |
| try: |
| cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", |
| "-show_entries", "stream=width,height", "-of", "json", path] |
| res = subprocess.run(cmd, capture_output=True, text=True) |
| data = json.loads(res.stdout) |
| return data['streams'][0]['width'], data['streams'][0]['height'] |
| except: |
| return 1080, 1920 |
|
|
| def hex_to_ass(hex_color, alpha="00"): |
| hex_c = hex_color.lstrip('#') |
| if len(hex_c) != 6: return "&H00FFFFFF" |
| r, g, b = hex_c[0:2], hex_c[2:4], hex_c[4:6] |
| return f"&H{alpha}{b}{g}{r}" |
|
|
| def format_time(seconds): |
| td = timedelta(seconds=seconds) |
| total = int(td.total_seconds()) |
| h, m, s = total//3600, (total%3600)//60, total%60 |
| cs = int(td.microseconds/10000) |
| return f"{h:01d}:{m:02d}:{s:02d}.{cs:02d}" |
|
|
| def split_long_segments(segments, max_words=6): |
| refined = [] |
| for seg in segments: |
| text = seg.get("text", "").strip() |
| start = float(seg.get("start", 0)) |
| end = float(seg.get("end", 0)) |
| words = text.split() |
| |
| if len(words) <= max_words: |
| refined.append(seg) |
| else: |
| mid = len(words) // 2 |
| part1_words = words[:mid] |
| part2_words = words[mid:] |
| |
| duration = end - start |
| split_time = start + (duration * (len(part1_words) / len(words))) |
| |
| refined.append({"start": start, "end": split_time, "text": " ".join(part1_words)}) |
| refined.append({"start": split_time, "end": end, "text": " ".join(part2_words)}) |
| |
| final_pass = [] |
| needs_another_pass = False |
| for r in refined: |
| if len(r["text"].split()) > max_words: |
| needs_another_pass = True |
| break |
| |
| if needs_another_pass: |
| return split_long_segments(refined, max_words) |
| |
| return refined |
|
|
| def estimate_word_timings(text: str, start: float, end: float) -> List[dict]: |
| words_raw = text.strip().split() |
| if not words_raw: |
| return [] |
| |
| duration = end - start |
| word_duration = duration / len(words_raw) |
| |
| words_output = [] |
| current_time = start |
| for w in words_raw: |
| words_output.append({ |
| "word": w, |
| "start": round(current_time, 2), |
| "end": round(current_time + word_duration, 2) |
| }) |
| current_time += word_duration |
| return words_output |
|
|
| def create_ass(data: ProcessRequest, path: str): |
| s = data.style |
| font_map = {"vazir": "Vazirmatn", "lalezar": "Lalezar", "roboto": "Arial", "bangers": "Impact"} |
| font = font_map.get(s.font, "Vazirmatn") |
| |
| primary = hex_to_ass(s.primaryColor) |
| outline_c = hex_to_ass(s.outlineColor) |
| back_col = "&H00000000" |
| border_style = 1 |
| outline_w = 2.0 |
| shadow = 0 |
| font_size = int(s.fontSize * 1.7) |
|
|
| is_purple_mode = (s.name == "karaoke_purple") |
| is_progressive_mode = (s.name == "progressive_write") |
| |
| center_x = data.video_width // 2 |
| pos_x = int(center_x + s.x) |
| pos_y = int(data.video_height - s.marginV) |
| align_tag = 2 |
|
|
| if is_purple_mode: |
| font = "Lalezar" |
| primary = hex_to_ass("#FFFFFF") |
| outline_c = hex_to_ass("#000000") |
| back_col = "&H00000000" |
| border_style = 1 |
| outline_w = 2 |
| elif s.backType == 'solid': |
| border_style = 3 |
| back_col = hex_to_ass(s.outlineColor, "00") |
| outline_c = hex_to_ass(s.outlineColor, "00") |
| elif s.backType == 'transparent': |
| border_style = 3 |
| back_col = "&H80000000" |
| outline_c = "&H00000000" |
| elif s.backType == 'none': |
| border_style = 1 |
| outline_w = 0 |
| shadow = 0 |
| outline_c = "&H00000000" |
| back_col = "&H00000000" |
| else: |
| border_style = 1 |
| outline_w = int(font_size * 0.04) |
| shadow = 2 |
|
|
| header = f"""[Script Info] |
| ScriptType: v4.00+ |
| PlayResX: {data.video_width} |
| PlayResY: {data.video_height} |
| WrapStyle: 2 |
| ScaledBorderAndShadow: yes |
| YCbCr Matrix: TV.601 |
| Collisions: Normal |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| Style: Default,{font},{font_size},{primary},&H000000FF,{outline_c},{back_col},1,0,0,0,100,100,0,0,{border_style},{outline_w},{shadow},2,10,10,10,1 |
| |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
| |
| |
| |
| purple_active_anim = r"{\1c&HFFFFFF&}{\3c&HF020A0&}{\t(0,150,\fscx115\fscy115\bord8\blur3)}" |
| purple_inactive = r"{\1c&HFFFFFF&}{\3c&H000000&}{\fscx100\fscy100\bord2\blur0}" |
| |
| |
| alpha_invisible = r"{\alpha&HFF}" |
| |
| alpha_anim_in = r"{\alpha&HFF\t(0,150,\alpha&H00)}" |
| alpha_visible_static = r"{\alpha&H00}" |
| |
| rtl_mark = "\u200f" |
| pos_tag = f"{{\\an{align_tag}}}{{\\pos({pos_x},{pos_y})}}" |
|
|
| with open(path, "w", encoding="utf-8") as f: |
| f.write(header) |
| for seg in data.segments: |
| if is_purple_mode and seg.words: |
| words_list = seg.words |
| for i in range(len(words_list)): |
| current_word_obj = words_list[i] |
| start_t = format_time(current_word_obj.start) |
| end_t = format_time(current_word_obj.end) |
| line_parts = [] |
| for j in range(len(words_list)): |
| w_txt = words_list[j].word.strip() |
| if i == j: |
| |
| line_parts.append(f"{purple_active_anim}{w_txt}") |
| else: |
| |
| line_parts.append(f"{purple_inactive}{w_txt}") |
| |
| line_parts.reverse() |
| |
| final_text = " ".join(line_parts).replace('\n', '\\N') |
| f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{pos_tag}{rtl_mark}{final_text}{rtl_mark}\n") |
| |
| elif is_progressive_mode and seg.words: |
| words_list = seg.words |
| for i in range(len(words_list)): |
| current_word_obj = words_list[i] |
| start_t = format_time(current_word_obj.start) |
| if i + 1 < len(words_list): |
| end_t = format_time(words_list[i+1].start) |
| else: |
| end_t = format_time(seg.end) |
| |
| line_parts = [] |
| for j, w in enumerate(words_list): |
| clean_w = w.word.strip() |
| if j == i: |
| |
| line_parts.append(f"{alpha_anim_in}{clean_w}") |
| elif j < i: |
| |
| line_parts.append(f"{alpha_visible_static}{clean_w}") |
| else: |
| |
| line_parts.append(f"{alpha_invisible}{clean_w}") |
| |
| line_parts.reverse() |
| |
| full_line_text = " ".join(line_parts).replace('\n', '\\N') |
| f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{pos_tag}{rtl_mark}{full_line_text}{rtl_mark}\n") |
| else: |
| clean_text = seg.text.strip().replace('\u200c', ' ').replace('\n', '\\N') |
| start = format_time(seg.start) |
| end = format_time(seg.end) |
| f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{pos_tag}{rtl_mark}{clean_text}{rtl_mark}\n") |
|
|
| @app.get("/") |
| async def index(): |
| return FileResponse("index.html") |
|
|
| @app.post("/api/upload") |
| async def upload(file: UploadFile = File(...)): |
| GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
| if not GEMINI_API_KEY: |
| raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.") |
| |
| if GEMINI_API_KEY.startswith("AIzaSyDbLpnR8Ij-oVQqRITZh4541c3z4rDRKNI"): |
| raise HTTPException(status_code=500, detail="کلید API نامعتبر است.") |
|
|
| genai.configure(api_key=GEMINI_API_KEY) |
|
|
| fid = str(uuid.uuid4())[:8] |
| ext = file.filename.split('.')[-1] |
| path = f"{TEMP_DIR}/{fid}.{ext}" |
| |
| with open(path, "wb") as f: |
| shutil.copyfileobj(file.file, f) |
| |
| w, h = get_video_info(path) |
|
|
| try: |
| video_file = genai.upload_file(path=path) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"خطا در اتصال به گوگل: {str(e)}") |
| |
| while video_file.state.name == "PROCESSING": |
| time.sleep(1) |
| video_file = genai.get_file(video_file.name) |
| |
| if video_file.state.name == "FAILED": |
| raise HTTPException(status_code=500, detail="پردازش ویدیو ناموفق بود.") |
|
|
| model = genai.GenerativeModel(MODEL_NAME) |
| |
| |
| prompt = """ |
| You are a professional subtitle synchronization engine. |
| Task: Transcribe the Persian (Farsi) audio from this video. |
| |
| CRITICAL SYNCHRONIZATION RULES: |
| 1. **START TIME**: Mark the timestamp EXACTLY when the first sound of the first word starts. Do NOT include silence before the word. Be aggressive with the start time. |
| 2. **SHORT SEGMENTS**: Keep segments extremely short (Max 6 words). |
| 3. **NO DELAY**: Do not wait for the sentence to finish processing. Timestamp the onset of speech. |
| |
| Output ONLY valid JSON: |
| [ |
| {"start": 0.00, "end": 1.25, "text": "سلام دوستان"}, |
| {"start": 1.25, "end": 3.50, "text": "به این ویدیو خوش آمدید"} |
| ] |
| """ |
|
|
| try: |
| response = model.generate_content( |
| [video_file, prompt], |
| generation_config={"response_mime_type": "application/json"} |
| ) |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"خطا در مدل: {str(e)}") |
| |
| try: |
| segments_raw = json.loads(response.text) |
| except: |
| clean_text = response.text.replace("```json", "").replace("```", "").strip() |
| try: |
| segments_raw = json.loads(clean_text) |
| except: |
| raise HTTPException(status_code=500, detail="خطا در فرمت خروجی مدل.") |
|
|
| processed_segments = split_long_segments(segments_raw, max_words=6) |
|
|
| final_output = [] |
| |
| |
| SYNC_OFFSET = 0.15 |
| |
| for i, s in enumerate(processed_segments): |
| start_t = max(0.0, float(s.get("start", 0)) - SYNC_OFFSET) |
| end_t = max(0.0, float(s.get("end", 0)) - SYNC_OFFSET) |
| |
| text_t = s.get("text", "").strip() |
| words_data = estimate_word_timings(text_t, start_t, end_t) |
| |
| final_output.append({ |
| "id": i, |
| "start": start_t, |
| "end": end_t, |
| "text": text_t, |
| "words": words_data |
| }) |
|
|
| return {"file_id": fid, "url": f"/temp/{fid}.{ext}", "width": w, "height": h, "segments": final_output} |
|
|
| @app.post("/api/render") |
| async def render(req: ProcessRequest): |
| ass_file = f"{TEMP_DIR}/{req.file_id}.ass" |
| create_ass(req, ass_file) |
| inp = None |
| for f in os.listdir(TEMP_DIR): |
| if f.startswith(req.file_id) and not f.endswith('.ass') and "_final_" not in f: |
| inp = f"{TEMP_DIR}/{f}" |
| break |
| |
| for f in os.listdir(TEMP_DIR): |
| if f.startswith(f"{req.file_id}_final_"): |
| try: os.remove(f"{TEMP_DIR}/{f}") |
| except: pass |
| |
| timestamp = int(time.time()) |
| out_filename = f"{req.file_id}_final_{timestamp}.mp4" |
| out_path = f"{TEMP_DIR}/{out_filename}" |
| |
| cmd = ["ffmpeg", "-y", "-i", inp, "-vf", f"ass={ass_file}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", "-c:a", "copy", out_path] |
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
| return {"url": f"/temp/{out_filename}"} |