Zirnavis3 / app.py
Hamed744's picture
Update app.py
c26ad4c verified
import os
import shutil
import subprocess
import uuid
import json
import time
from datetime import timedelta
from typing import List, Optional
from fastapi import FastAPI, UploadFile, File, HTTPException
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
from pydantic import BaseModel
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
TEMP_DIR = "temp"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/temp", StaticFiles(directory="temp"), name="temp")
# --- تنظیم مدل طبق دستور شما ---
MODEL_NAME = "gemini-2.5-pro"
# --- مدل‌های داده ---
class WordInfo(BaseModel):
word: str
start: float
end: float
class SubtitleSegment(BaseModel):
id: int
start: float
end: float
text: str
words: Optional[List[WordInfo]] = []
class StyleConfig(BaseModel):
font: str
fontSize: int
primaryColor: str
outlineColor: str
backType: str
marginV: int
x: Optional[int] = 0
name: Optional[str] = "classic"
class ProcessRequest(BaseModel):
file_id: str
segments: List[SubtitleSegment]
video_width: int
video_height: int
style: StyleConfig
# --- توابع کمکی ---
def get_video_info(path):
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "json", path]
res = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(res.stdout)
return data['streams'][0]['width'], data['streams'][0]['height']
except:
return 1080, 1920
def hex_to_ass(hex_color, alpha="00"):
hex_c = hex_color.lstrip('#')
if len(hex_c) != 6: return "&H00FFFFFF"
r, g, b = hex_c[0:2], hex_c[2:4], hex_c[4:6]
return f"&H{alpha}{b}{g}{r}"
def format_time(seconds):
td = timedelta(seconds=seconds)
total = int(td.total_seconds())
h, m, s = total//3600, (total%3600)//60, total%60
cs = int(td.microseconds/10000)
return f"{h:01d}:{m:02d}:{s:02d}.{cs:02d}"
def split_long_segments(segments, max_words=6):
refined = []
for seg in segments:
text = seg.get("text", "").strip()
start = float(seg.get("start", 0))
end = float(seg.get("end", 0))
words = text.split()
if len(words) <= max_words:
refined.append(seg)
else:
mid = len(words) // 2
part1_words = words[:mid]
part2_words = words[mid:]
duration = end - start
split_time = start + (duration * (len(part1_words) / len(words)))
refined.append({"start": start, "end": split_time, "text": " ".join(part1_words)})
refined.append({"start": split_time, "end": end, "text": " ".join(part2_words)})
final_pass = []
needs_another_pass = False
for r in refined:
if len(r["text"].split()) > max_words:
needs_another_pass = True
break
if needs_another_pass:
return split_long_segments(refined, max_words)
return refined
def estimate_word_timings(text: str, start: float, end: float) -> List[dict]:
words_raw = text.strip().split()
if not words_raw:
return []
duration = end - start
word_duration = duration / len(words_raw)
words_output = []
current_time = start
for w in words_raw:
words_output.append({
"word": w,
"start": round(current_time, 2),
"end": round(current_time + word_duration, 2)
})
current_time += word_duration
return words_output
def create_ass(data: ProcessRequest, path: str):
s = data.style
font_map = {"vazir": "Vazirmatn", "lalezar": "Lalezar", "roboto": "Arial", "bangers": "Impact"}
font = font_map.get(s.font, "Vazirmatn")
primary = hex_to_ass(s.primaryColor)
outline_c = hex_to_ass(s.outlineColor)
back_col = "&H00000000"
border_style = 1
outline_w = 2.0
shadow = 0
font_size = int(s.fontSize * 1.7)
is_purple_mode = (s.name == "karaoke_purple")
is_progressive_mode = (s.name == "progressive_write")
center_x = data.video_width // 2
pos_x = int(center_x + s.x)
pos_y = int(data.video_height - s.marginV)
align_tag = 2
if is_purple_mode:
font = "Lalezar"
primary = hex_to_ass("#FFFFFF")
outline_c = hex_to_ass("#000000")
back_col = "&H00000000"
border_style = 1
outline_w = 2
elif s.backType == 'solid':
border_style = 3
back_col = hex_to_ass(s.outlineColor, "00")
outline_c = hex_to_ass(s.outlineColor, "00")
elif s.backType == 'transparent':
border_style = 3
back_col = "&H80000000"
outline_c = "&H00000000"
elif s.backType == 'none':
border_style = 1
outline_w = 0
shadow = 0
outline_c = "&H00000000"
back_col = "&H00000000"
else:
border_style = 1
outline_w = int(font_size * 0.04)
shadow = 2
header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {data.video_width}
PlayResY: {data.video_height}
WrapStyle: 2
ScaledBorderAndShadow: yes
YCbCr Matrix: TV.601
Collisions: Normal
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font},{font_size},{primary},&H000000FF,{outline_c},{back_col},1,0,0,0,100,100,0,0,{border_style},{outline_w},{shadow},2,10,10,10,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
# --- تعریف استایل‌های متحرک ---
# استفاده از \t برای انیمیشن نرم (Scale و تغییر رنگ)
# \fscx115\fscy115 یعنی 115 درصد بزرگنمایی (زوم)
purple_active_anim = r"{\1c&HFFFFFF&}{\3c&HF020A0&}{\t(0,150,\fscx115\fscy115\bord8\blur3)}"
purple_inactive = r"{\1c&HFFFFFF&}{\3c&H000000&}{\fscx100\fscy100\bord2\blur0}"
# برای حالت کلمه به کلمه، از محو شدن استفاده می‌کنیم
alpha_invisible = r"{\alpha&HFF}"
# انیمیشن ظاهر شدن: از نامرئی (FF) به مرئی (00) در 150 میلی‌ثانیه
alpha_anim_in = r"{\alpha&HFF\t(0,150,\alpha&H00)}"
alpha_visible_static = r"{\alpha&H00}"
rtl_mark = "\u200f"
pos_tag = f"{{\\an{align_tag}}}{{\\pos({pos_x},{pos_y})}}"
with open(path, "w", encoding="utf-8") as f:
f.write(header)
for seg in data.segments:
if is_purple_mode and seg.words:
words_list = seg.words
for i in range(len(words_list)):
current_word_obj = words_list[i]
start_t = format_time(current_word_obj.start)
end_t = format_time(current_word_obj.end)
line_parts = []
for j in range(len(words_list)):
w_txt = words_list[j].word.strip()
if i == j:
# کلمه فعال با انیمیشن زوم و رنگ
line_parts.append(f"{purple_active_anim}{w_txt}")
else:
# کلمات غیرفعال عادی
line_parts.append(f"{purple_inactive}{w_txt}")
line_parts.reverse()
final_text = " ".join(line_parts).replace('\n', '\\N')
f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{pos_tag}{rtl_mark}{final_text}{rtl_mark}\n")
elif is_progressive_mode and seg.words:
words_list = seg.words
for i in range(len(words_list)):
current_word_obj = words_list[i]
start_t = format_time(current_word_obj.start)
if i + 1 < len(words_list):
end_t = format_time(words_list[i+1].start)
else:
end_t = format_time(seg.end)
line_parts = []
for j, w in enumerate(words_list):
clean_w = w.word.strip()
if j == i:
# کلمه جدید با انیمیشن ظاهر می‌شود
line_parts.append(f"{alpha_anim_in}{clean_w}")
elif j < i:
# کلمات قبلی ثابت هستند
line_parts.append(f"{alpha_visible_static}{clean_w}")
else:
# کلمات آینده نامرئی
line_parts.append(f"{alpha_invisible}{clean_w}")
line_parts.reverse()
full_line_text = " ".join(line_parts).replace('\n', '\\N')
f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{pos_tag}{rtl_mark}{full_line_text}{rtl_mark}\n")
else:
clean_text = seg.text.strip().replace('\u200c', ' ').replace('\n', '\\N')
start = format_time(seg.start)
end = format_time(seg.end)
f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{pos_tag}{rtl_mark}{clean_text}{rtl_mark}\n")
@app.get("/")
async def index():
return FileResponse("index.html")
@app.post("/api/upload")
async def upload(file: UploadFile = File(...)):
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY")
if not GEMINI_API_KEY:
raise HTTPException(status_code=500, detail="کلید API تنظیم نشده است.")
if GEMINI_API_KEY.startswith("AIzaSyDbLpnR8Ij-oVQqRITZh4541c3z4rDRKNI"):
raise HTTPException(status_code=500, detail="کلید API نامعتبر است.")
genai.configure(api_key=GEMINI_API_KEY)
fid = str(uuid.uuid4())[:8]
ext = file.filename.split('.')[-1]
path = f"{TEMP_DIR}/{fid}.{ext}"
with open(path, "wb") as f:
shutil.copyfileobj(file.file, f)
w, h = get_video_info(path)
try:
video_file = genai.upload_file(path=path)
except Exception as e:
raise HTTPException(status_code=500, detail=f"خطا در اتصال به گوگل: {str(e)}")
while video_file.state.name == "PROCESSING":
time.sleep(1)
video_file = genai.get_file(video_file.name)
if video_file.state.name == "FAILED":
raise HTTPException(status_code=500, detail="پردازش ویدیو ناموفق بود.")
model = genai.GenerativeModel(MODEL_NAME)
# پرامپت دقیق
prompt = """
You are a professional subtitle synchronization engine.
Task: Transcribe the Persian (Farsi) audio from this video.
CRITICAL SYNCHRONIZATION RULES:
1. **START TIME**: Mark the timestamp EXACTLY when the first sound of the first word starts. Do NOT include silence before the word. Be aggressive with the start time.
2. **SHORT SEGMENTS**: Keep segments extremely short (Max 6 words).
3. **NO DELAY**: Do not wait for the sentence to finish processing. Timestamp the onset of speech.
Output ONLY valid JSON:
[
{"start": 0.00, "end": 1.25, "text": "سلام دوستان"},
{"start": 1.25, "end": 3.50, "text": "به این ویدیو خوش آمدید"}
]
"""
try:
response = model.generate_content(
[video_file, prompt],
generation_config={"response_mime_type": "application/json"}
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"خطا در مدل: {str(e)}")
try:
segments_raw = json.loads(response.text)
except:
clean_text = response.text.replace("```json", "").replace("```", "").strip()
try:
segments_raw = json.loads(clean_text)
except:
raise HTTPException(status_code=500, detail="خطا در فرمت خروجی مدل.")
processed_segments = split_long_segments(segments_raw, max_words=6)
final_output = []
# سینک صدا (جلوگیری از تاخیر)
SYNC_OFFSET = 0.15
for i, s in enumerate(processed_segments):
start_t = max(0.0, float(s.get("start", 0)) - SYNC_OFFSET)
end_t = max(0.0, float(s.get("end", 0)) - SYNC_OFFSET)
text_t = s.get("text", "").strip()
words_data = estimate_word_timings(text_t, start_t, end_t)
final_output.append({
"id": i,
"start": start_t,
"end": end_t,
"text": text_t,
"words": words_data
})
return {"file_id": fid, "url": f"/temp/{fid}.{ext}", "width": w, "height": h, "segments": final_output}
@app.post("/api/render")
async def render(req: ProcessRequest):
ass_file = f"{TEMP_DIR}/{req.file_id}.ass"
create_ass(req, ass_file)
inp = None
for f in os.listdir(TEMP_DIR):
if f.startswith(req.file_id) and not f.endswith('.ass') and "_final_" not in f:
inp = f"{TEMP_DIR}/{f}"
break
for f in os.listdir(TEMP_DIR):
if f.startswith(f"{req.file_id}_final_"):
try: os.remove(f"{TEMP_DIR}/{f}")
except: pass
timestamp = int(time.time())
out_filename = f"{req.file_id}_final_{timestamp}.mp4"
out_path = f"{TEMP_DIR}/{out_filename}"
cmd = ["ffmpeg", "-y", "-i", inp, "-vf", f"ass={ass_file}", "-c:v", "libx264", "-preset", "ultrafast", "-crf", "26", "-c:a", "copy", out_path]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return {"url": f"/temp/{out_filename}"}