| import os |
| import shutil |
| import subprocess |
| import uuid |
| import json |
| import time |
| from datetime import timedelta |
| from typing import List, Optional |
| from fastapi import FastAPI, UploadFile, File |
| from fastapi.responses import FileResponse |
| from fastapi.staticfiles import StaticFiles |
| from fastapi.middleware.cors import CORSMiddleware |
| from faster_whisper import WhisperModel |
| from pydantic import BaseModel |
| import arabic_reshaper |
|
|
| app = FastAPI() |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| TEMP_DIR = "temp" |
| os.makedirs(TEMP_DIR, exist_ok=True) |
| app.mount("/temp", StaticFiles(directory="temp"), name="temp") |
|
|
| model = WhisperModel("small", device="cpu", compute_type="int8") |
|
|
| |
| class WordInfo(BaseModel): |
| word: str |
| start: float |
| end: float |
|
|
| class SubtitleSegment(BaseModel): |
| id: int |
| start: float |
| end: float |
| text: str |
| words: Optional[List[WordInfo]] = [] |
|
|
| class StyleConfig(BaseModel): |
| font: str |
| fontSize: int |
| primaryColor: str |
| outlineColor: str |
| backType: str |
| marginV: int |
| name: Optional[str] = "classic" |
|
|
| class ProcessRequest(BaseModel): |
| file_id: str |
| segments: List[SubtitleSegment] |
| video_width: int |
| video_height: int |
| style: StyleConfig |
|
|
| |
| def get_video_info(path): |
| try: |
| cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", |
| "-show_entries", "stream=width,height", "-of", "json", path] |
| res = subprocess.run(cmd, capture_output=True, text=True) |
| data = json.loads(res.stdout) |
| return data['streams'][0]['width'], data['streams'][0]['height'] |
| except: |
| return 1080, 1920 |
|
|
| def hex_to_ass(hex_color, alpha="00"): |
| hex_c = hex_color.lstrip('#') |
| if len(hex_c) != 6: return "&H00FFFFFF" |
| r, g, b = hex_c[0:2], hex_c[2:4], hex_c[4:6] |
| return f"&H{alpha}{b}{g}{r}" |
|
|
| def format_time(seconds): |
| td = timedelta(seconds=seconds) |
| total = int(td.total_seconds()) |
| h, m, s = total//3600, (total%3600)//60, total%60 |
| cs = int(td.microseconds/10000) |
| return f"{h:01d}:{m:02d}:{s:02d}.{cs:02d}" |
|
|
| def fix_persian_chars(text): |
| """حروف فارسی را میچسباند (Reshape)""" |
| if not text: return "" |
| try: |
| |
| |
| return arabic_reshaper.reshape(text) |
| except: |
| return text |
|
|
| def create_ass(data: ProcessRequest, path: str): |
| s = data.style |
| |
| |
| font_map = { |
| "vazir": "Vazirmatn", |
| "lalezar": "Lalezar", |
| "roboto": "Roboto", |
| "bangers": "Bangers" |
| } |
| font_name = font_map.get(s.font, "Vazirmatn") |
| |
| |
| |
| use_bold = 1 |
| if font_name in ["Lalezar", "Bangers"]: |
| use_bold = 0 |
| |
| primary = hex_to_ass(s.primaryColor) |
| outline_c = hex_to_ass(s.outlineColor) |
| back_col = "&H00000000" |
| border_style = 1 |
| outline_w = 2.0 |
| shadow = 0 |
| font_size = int(s.fontSize * 1.7) |
| |
| is_purple_mode = (s.name == "karaoke_purple") |
| |
| if is_purple_mode: |
| font_name = "Lalezar" |
| use_bold = 0 |
| primary = hex_to_ass("#FFFFFF") |
| outline_c = hex_to_ass("#000000") |
| back_col = "&H00000000" |
| border_style = 1 |
| outline_w = 2 |
| |
| elif s.backType == 'solid': |
| border_style = 3 |
| back_col = hex_to_ass(s.outlineColor, "00") |
| outline_c = hex_to_ass(s.outlineColor, "00") |
| elif s.backType == 'transparent': |
| border_style = 3 |
| back_col = "&H80000000" |
| outline_c = "&H00000000" |
| else: |
| border_style = 1 |
| outline_w = int(font_size * 0.04) |
| shadow = 2 |
|
|
| |
| header = f"""[Script Info] |
| ScriptType: v4.00+ |
| PlayResX: {data.video_width} |
| PlayResY: {data.video_height} |
| WrapStyle: 2 |
| ScaledBorderAndShadow: yes |
| YCbCr Matrix: TV.601 |
| Collisions: Normal |
| |
| [V4+ Styles] |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding |
| Style: Default,{font_name},{font_size},{primary},&H000000FF,{outline_c},{back_col},{use_bold},0,0,0,100,100,0,0,{border_style},{outline_w},{shadow},2,10,10,{s.marginV},1 |
| |
| [Events] |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text |
| """ |
| |
| purple_active = r"{\1c&HFFFFFF&}{\3c&HF020A0&}{\bord9}{\blur3}" |
| purple_inactive = r"{\1c&HFFFFFF&}{\3c&H000000&}{\bord2}{\blur0}" |
|
|
| with open(path, "w", encoding="utf-8") as f: |
| f.write(header) |
| for seg in data.segments: |
| |
| |
| if is_purple_mode and seg.words: |
| words_list = seg.words |
| for i in range(len(words_list)): |
| current_word_obj = words_list[i] |
| start_t = format_time(current_word_obj.start) |
| end_t = format_time(current_word_obj.end) |
| |
| line_parts = [] |
| |
| |
| |
| |
| |
| |
| |
| |
| reversed_indices = range(len(words_list) - 1, -1, -1) |
| |
| for j in reversed_indices: |
| raw_word = words_list[j].word.strip() |
| fixed_word = fix_persian_chars(raw_word) |
| |
| if i == j: |
| line_parts.append(f"{purple_active}{fixed_word}") |
| else: |
| line_parts.append(f"{purple_inactive}{fixed_word}") |
| |
| final_text = " ".join(line_parts).replace('\n', '\\N') |
| f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{final_text}\n") |
| |
| |
| else: |
| clean_text = seg.text.strip().replace('\u200c', ' ') |
| |
| |
| words = clean_text.split() |
| reversed_words = [] |
| for w in reversed(words): |
| reversed_words.append(fix_persian_chars(w)) |
| |
| final_text_fixed = " ".join(reversed_words).replace('\n', '\\N') |
| |
| start = format_time(seg.start) |
| end = format_time(seg.end) |
| f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{final_text_fixed}\n") |
|
|
| |
|
|
| @app.get("/") |
| async def index(): |
| return FileResponse("index.html") |
|
|
| @app.post("/api/upload") |
| async def upload(file: UploadFile = File(...)): |
| fid = str(uuid.uuid4())[:8] |
| ext = file.filename.split('.')[-1] |
| path = f"{TEMP_DIR}/{fid}.{ext}" |
| |
| with open(path, "wb") as f: |
| shutil.copyfileobj(file.file, f) |
| |
| w, h = get_video_info(path) |
| |
| segments_gen, _ = model.transcribe(path, language="fa", word_timestamps=True) |
| segments = list(segments_gen) |
| |
| MAX_WORDS_PER_SEGMENT = 6 |
| refined_segments = [] |
| current_words_bucket = [] |
| |
| for seg in segments: |
| if not seg.words: |
| refined_segments.append({ |
| "start": seg.start, "end": seg.end, "text": seg.text.strip(), "words": [] |
| }) |
| continue |
| |
| for word in seg.words: |
| current_words_bucket.append({"word": word.word, "start": word.start, "end": word.end}) |
| if len(current_words_bucket) >= MAX_WORDS_PER_SEGMENT: |
| text = "".join([w['word'] for w in current_words_bucket]).strip() |
| refined_segments.append({ |
| "start": current_words_bucket[0]['start'], |
| "end": current_words_bucket[-1]['end'], |
| "text": text, |
| "words": list(current_words_bucket) |
| }) |
| current_words_bucket = [] |
| |
| if current_words_bucket: |
| text = "".join([w['word'] for w in current_words_bucket]).strip() |
| refined_segments.append({ |
| "start": current_words_bucket[0]['start'], |
| "end": current_words_bucket[-1]['end'], |
| "text": text, |
| "words": list(current_words_bucket) |
| }) |
|
|
| final_output = [] |
| for i, s in enumerate(refined_segments): |
| final_output.append({ |
| "id": i, |
| "start": s['start'], |
| "end": s['end'], |
| "text": s['text'], |
| "words": s['words'] |
| }) |
| |
| return {"file_id": fid, "url": f"/temp/{fid}.{ext}", "width": w, "height": h, "segments": final_output} |
|
|
| @app.post("/api/render") |
| async def render(req: ProcessRequest): |
| ass_file = f"{TEMP_DIR}/{req.file_id}.ass" |
| create_ass(req, ass_file) |
| |
| inp = None |
| for f in os.listdir(TEMP_DIR): |
| if f.startswith(req.file_id) and not f.endswith('.ass') and "_final_" not in f: |
| inp = f"{TEMP_DIR}/{f}" |
| break |
| |
| for f in os.listdir(TEMP_DIR): |
| if f.startswith(f"{req.file_id}_final_"): |
| try: os.remove(f"{TEMP_DIR}/{f}") |
| except: pass |
|
|
| timestamp = int(time.time()) |
| out_filename = f"{req.file_id}_final_{timestamp}.mp4" |
| out_path = f"{TEMP_DIR}/{out_filename}" |
| |
| cmd = [ |
| "ffmpeg", "-y", "-i", inp, |
| "-vf", f"ass={ass_file}", |
| "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", |
| "-c:a", "copy", out_path |
| ] |
| subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) |
| |
| return {"url": f"/temp/{out_filename}"} |