Zirnavis1 / app.py
Elias207's picture
Update app.py
07e8204 verified
import os
import shutil
import subprocess
import uuid
import json
import time
from datetime import timedelta
from typing import List, Optional
from fastapi import FastAPI, UploadFile, File
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
from faster_whisper import WhisperModel
from pydantic import BaseModel
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
TEMP_DIR = "temp"
os.makedirs(TEMP_DIR, exist_ok=True)
app.mount("/temp", StaticFiles(directory="temp"), name="temp")
model = WhisperModel("small", device="cpu", compute_type="int8")
# مدل‌های داده
class WordInfo(BaseModel):
word: str
start: float
end: float
class SubtitleSegment(BaseModel):
id: int
start: float
end: float
text: str
words: Optional[List[WordInfo]] = []
class StyleConfig(BaseModel):
font: str
fontSize: int
primaryColor: str
outlineColor: str
backType: str
marginV: int
name: Optional[str] = "classic" # نام استایل
class ProcessRequest(BaseModel):
file_id: str
segments: List[SubtitleSegment]
video_width: int
video_height: int
style: StyleConfig
def get_video_info(path):
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0",
"-show_entries", "stream=width,height", "-of", "json", path]
res = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(res.stdout)
return data['streams'][0]['width'], data['streams'][0]['height']
except:
return 1080, 1920
def hex_to_ass(hex_color, alpha="00"):
hex_c = hex_color.lstrip('#')
if len(hex_c) != 6: return "&H00FFFFFF"
r, g, b = hex_c[0:2], hex_c[2:4], hex_c[4:6]
return f"&H{alpha}{b}{g}{r}"
def format_time(seconds):
td = timedelta(seconds=seconds)
total = int(td.total_seconds())
h, m, s = total//3600, (total%3600)//60, total%60
cs = int(td.microseconds/10000)
return f"{h:01d}:{m:02d}:{s:02d}.{cs:02d}"
def create_ass(data: ProcessRequest, path: str):
s = data.style
font_map = {"vazir": "Vazirmatn", "lalezar": "Lalezar", "roboto": "Arial", "bangers": "Impact"}
font = font_map.get(s.font, "Vazirmatn")
# تنظیمات عمومی
primary = hex_to_ass(s.primaryColor)
outline_c = hex_to_ass(s.outlineColor)
back_col = "&H00000000"
border_style = 1
outline_w = 2.0
shadow = 0
font_size = int(s.fontSize * 1.7)
# تنظیمات اختصاصی استایل بنفش
if s.name == "karaoke_purple":
font = "Lalezar"
primary = hex_to_ass("#FFFFFF") # سفید
outline_c = hex_to_ass("#000000") # مشکی نازک برای خوانایی
back_col = "&H00000000"
border_style = 1
outline_w = 2
elif s.backType == 'solid':
border_style = 3
back_col = hex_to_ass(s.outlineColor, "00")
outline_c = hex_to_ass(s.outlineColor, "00")
elif s.backType == 'transparent':
border_style = 3
back_col = "&H80000000"
outline_c = "&H00000000"
else:
border_style = 1
outline_w = int(font_size * 0.04)
shadow = 2
header = f"""[Script Info]
ScriptType: v4.00+
PlayResX: {data.video_width}
PlayResY: {data.video_height}
WrapStyle: 2
ScaledBorderAndShadow: yes
YCbCr Matrix: TV.601
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Default,{font},{font_size},{primary},&H000000FF,{outline_c},{back_col},1,0,0,0,100,100,0,0,{border_style},{outline_w},{shadow},2,10,10,{s.marginV},1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
# کد رنگ بنفش برای ASS (فرمت BGR)
purple_ass_code = r"{\c&HF020A0&}"
white_ass_code = r"{\c&HFFFFFF&}"
with open(path, "w", encoding="utf-8") as f:
f.write(header)
for seg in data.segments:
# اگر استایل بنفش انتخاب شده باشد
if s.name == "karaoke_purple" and seg.words:
words_text = [w.word.strip() for w in seg.words]
# برای هر کلمه یک خط دیالوگ می‌سازیم
for i, word_obj in enumerate(seg.words):
start_t = format_time(word_obj.start)
end_t = format_time(word_obj.end)
# ساختن کل جمله با رنگ‌بندی
line_parts = []
for j, w_txt in enumerate(words_text):
if i == j:
line_parts.append(f"{purple_ass_code}{w_txt}")
else:
line_parts.append(f"{white_ass_code}{w_txt}")
# تبدیل \n به \N برای شکست خط دستی
final_text = " ".join(line_parts).replace('\n', '\\N')
f.write(f"Dialogue: 0,{start_t},{end_t},Default,,0,0,0,,{final_text}\n")
else:
# حالت عادی (کد اول)
clean_text = seg.text.strip().replace('\u200c', ' ').replace('\n', '\\N')
start = format_time(seg.start)
end = format_time(seg.end)
f.write(f"Dialogue: 0,{start},{end},Default,,0,0,0,,{clean_text}\n")
@app.get("/")
async def index():
return FileResponse("index.html")
@app.post("/api/upload")
async def upload(file: UploadFile = File(...)):
fid = str(uuid.uuid4())[:8]
ext = file.filename.split('.')[-1]
path = f"{TEMP_DIR}/{fid}.{ext}"
with open(path, "wb") as f:
shutil.copyfileobj(file.file, f)
w, h = get_video_info(path)
# استخراج کلمات
segments_gen, _ = model.transcribe(path, language="fa", word_timestamps=True)
segments = list(segments_gen)
MAX_WORDS_PER_SEGMENT = 6
refined_segments = []
current_words_bucket = []
for seg in segments:
if not seg.words:
refined_segments.append({
"start": seg.start, "end": seg.end, "text": seg.text.strip(), "words": []
})
continue
for word in seg.words:
current_words_bucket.append({"word": word.word, "start": word.start, "end": word.end})
if len(current_words_bucket) >= MAX_WORDS_PER_SEGMENT:
text = "".join([w['word'] for w in current_words_bucket]).strip()
refined_segments.append({
"start": current_words_bucket[0]['start'],
"end": current_words_bucket[-1]['end'],
"text": text,
"words": list(current_words_bucket)
})
current_words_bucket = []
if current_words_bucket:
text = "".join([w['word'] for w in current_words_bucket]).strip()
refined_segments.append({
"start": current_words_bucket[0]['start'],
"end": current_words_bucket[-1]['end'],
"text": text,
"words": list(current_words_bucket)
})
final_output = []
for i, s in enumerate(refined_segments):
final_output.append({
"id": i,
"start": s['start'],
"end": s['end'],
"text": s['text'],
"words": s['words']
})
return {"file_id": fid, "url": f"/temp/{fid}.{ext}", "width": w, "height": h, "segments": final_output}
@app.post("/api/render")
async def render(req: ProcessRequest):
ass_file = f"{TEMP_DIR}/{req.file_id}.ass"
create_ass(req, ass_file)
inp = None
for f in os.listdir(TEMP_DIR):
if f.startswith(req.file_id) and not f.endswith('.ass') and "_final_" not in f:
inp = f"{TEMP_DIR}/{f}"
break
for f in os.listdir(TEMP_DIR):
if f.startswith(f"{req.file_id}_final_"):
try: os.remove(f"{TEMP_DIR}/{f}")
except: pass
timestamp = int(time.time())
out_filename = f"{req.file_id}_final_{timestamp}.mp4"
out_path = f"{TEMP_DIR}/{out_filename}"
cmd = [
"ffmpeg", "-y", "-i", inp,
"-vf", f"ass={ass_file}",
"-c:v", "libx264", "-preset", "ultrafast", "-crf", "26",
"-c:a", "copy", out_path
]
subprocess.run(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
return {"url": f"/temp/{out_filename}"}