Zirnavis49 / app.py
Elias207's picture
Update app.py
89acd2e verified
import os
import shutil
import subprocess
import uuid
import json
import time
import asyncio
import random
import importlib.util
from datetime import datetime
from typing import List, Optional, Union, Dict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
from pydantic import BaseModel
from PIL import Image, ImageDraw, ImageFont
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
TEMP_DIR = "temp"
STATIC_DIR = "static"
STYLES_DIR = "styles"
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(STATIC_DIR, exist_ok=True)
os.makedirs(STYLES_DIR, exist_ok=True)
app.mount("/temp", StaticFiles(directory="temp"), name="temp")
app.mount("/static", StaticFiles(directory="static"), name="static")
MODEL_NAME = "gemini-2.5-flash"
FONT_DIR = "font"
FONT_FILES_MAP = {
"vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf",
"bangers": "Bangers.ttf", "roboto": "Vazirmatn-Regular.ttf"
}
# --- Dynamic Style Loading System ---
loaded_styles = {} # Map ID -> Module
style_configs = {} # Map ID -> Config Dict
style_templates = {} # Map ID -> Frontend Template String
def load_all_styles():
print("--- Loading Styles from /styles ---")
for filename in os.listdir(STYLES_DIR):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3]
file_path = os.path.join(STYLES_DIR, filename)
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec and spec.loader:
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
if hasattr(mod, 'config'):
ids = mod.config.get("ids", [])
for style_id in ids:
loaded_styles[style_id] = mod
style_configs[style_id] = mod.config
if hasattr(mod, 'frontend_template'):
style_templates[style_id] = mod.frontend_template.strip()
print(f"Loaded Style: {style_id}")
# Load styles on startup
load_all_styles()
raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "")
API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()]
if not API_KEYS:
single_key = os.getenv("GEMINI_API_KEY")
if single_key: API_KEYS.append(single_key)
print(f"--- {len(API_KEYS)} Gemini Keys Detected ---")
class WordInfo(BaseModel):
word: str; start: float; end: float
highlight: Optional[bool] = False
color: Optional[str] = None
class SubtitleSegment(BaseModel):
id: Union[str, int]; start: float; end: float; text: str
words: Optional[List[WordInfo]] = []
class StyleConfig(BaseModel):
font: str; fontSize: int; primaryColor: str; outlineColor: str
backType: str; marginV: int
x: Optional[int] = 0
name: Optional[str] = "classic"
radius: Optional[int] = 16
paddingX: Optional[int] = 20
paddingY: Optional[int] = 10
total_video_duration: Optional[float] = None
current_render_time: Optional[float] = None
entry_anim_progress: Optional[float] = 1.0
styleBgColors: Dict[str, str] = {} # <--- این خط اضافه شود
styleColors: Dict[str, str] = {}
styleActiveColors: Dict[str, str] = {}
class ProcessRequest(BaseModel):
file_id: str; segments: List[SubtitleSegment]
video_width: int; video_height: int; style: StyleConfig
class StylePrompt(BaseModel):
description: str
class JobStatus:
QUEUED = "queued"; PROCESSING = "processing"
COMPLETED = "completed"; FAILED = "failed"
class Job:
def __init__(self, job_id: str, request_data: ProcessRequest):
self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED
self.created_at = datetime.now(); self.result_url = None; self.error_message = None
render_queue = asyncio.Queue()
jobs_db: Dict[str, Job] = {}
async def queue_worker():
print("--- Queue Worker Started ---")
while True:
job_id = await render_queue.get()
job = jobs_db.get(job_id)
if job:
try:
print(f"Processing job: {job_id}")
job.status = JobStatus.PROCESSING
output_url = process_render_logic(job.data)
job.result_url = output_url
job.status = JobStatus.COMPLETED
print(f"Job {job_id} completed.")
except Exception as e:
print(f"Job {job_id} failed: {e}")
job.status = JobStatus.FAILED
job.error_message = str(e)
render_queue.task_done()
@app.on_event("startup")
async def startup_event(): asyncio.create_task(queue_worker())
def get_video_info(path):
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path]
res = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(res.stdout)
stream = data['streams'][0]
w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration')
if not dur:
cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path]
res_dur = subprocess.run(cmd_dur, capture_output=True, text=True)
data_dur = json.loads(res_dur.stdout)
dur = data_dur['format'].get('duration', 60)
return w, h, float(dur)
except: return 1080, 1920, 60.0
def get_font_object(style_font_name, size):
target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf")
target_path = os.path.join(FONT_DIR, target_filename)
if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf")
if os.path.exists(target_path): return ImageFont.truetype(target_path, size)
return ImageFont.load_default()
def get_color_tuple(color_str: str, default=(255, 255, 255, 255)):
if not color_str or not isinstance(color_str, str): return default
color_str = color_str.strip().lower()
if color_str.startswith('#'):
try:
hex_val = color_str.lstrip('#')
if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,)
elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6))
except: pass
elif color_str.startswith('rgba'):
try:
content = color_str[color_str.find('(')+1 : color_str.rfind(')')]
parts = [x.strip() for x in content.split(',')]
if len(parts) >= 4:
r, g, b = int(parts[0]), int(parts[1]), int(parts[2])
a = int(float(parts[3]) * 255)
return (r, g, b, a)
except: pass
elif color_str.startswith('rgb'):
try:
content = color_str[color_str.find('(')+1 : color_str.rfind(')')]
parts = [x.strip() for x in content.split(',')]
if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255)
except: pass
return default
# --- Main Drawing Function (Refactored) ---
def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None):
img = Image.new('RGBA', (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
font = get_font_object(style.font, style.fontSize)
# Text Wrapping Logic
lines = []
# اگر استایل موزیکال بود، همه کلمات در یک خط باشند (بدون محدودیت)
if style.name == "music_player":
lines.append(text_parts)
else:
# برای بقیه استایل‌ها: محدودیت 5 کلمه
MAX_WORDS_PER_LINE = 5
current_line = []
for i, word in enumerate(text_parts):
current_line.append(word)
if len(current_line) == MAX_WORDS_PER_LINE:
lines.append(current_line)
current_line = []
if current_line:
lines.append(current_line)
# Pre-calculate line metrics (width, etc)
line_metrics = []
max_line_width = 0
for line_words in lines:
w_widths = []
l_width = 0
full_line_text = " ".join(line_words)
try: l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa')
except: l_width = font.getlength(full_line_text)
if l_width > max_line_width: max_line_width = l_width
# We also need individual word widths for the styles
for w in line_words:
try: wl = draw.textlength(w, font=font, direction='rtl', language='fa')
except: wl = font.getlength(w)
w_widths.append(wl)
line_metrics.append({"width": l_width, "words": line_words, "word_widths": w_widths})
# --- Delegate to Style Module ---
style_module = loaded_styles.get(style.name)
# ... بخشی از کد ...
if style_module and hasattr(style_module, 'draw_frame'):
style_module.draw_frame(
draw=draw,
img=img,
width=width,
height=height,
style_config=style,
lines=lines,
line_metrics=line_metrics,
active_idx=active_idx,
font=font,
color_parser=get_color_tuple,
word_infos=word_infos # <--- این خط اضافه شد
)
else:
# Fallback if style not found (e.g. use classic logic inline or default)
print(f"Warning: Style {style.name} not found, using default.")
# Simple fallback text
y = height - style.marginV
draw.text((width/2, y), "Style Error", font=font, fill="red")
return img
def generate_subtitle_video(data: ProcessRequest, temp_dir: str):
list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt")
empty_img_path = os.path.join(temp_dir, "empty.png")
if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path)
# --- محاسبه زمان کل ویدیو ---
sorted_segments = sorted(data.segments, key=lambda x: x.start)
if sorted_segments:
setattr(data.style, 'total_video_duration', sorted_segments[-1].end)
else:
setattr(data.style, 'total_video_duration', 1.0)
with open(list_file, "w") as f:
current_timeline = 0.0
last_generated_image = "empty.png" # <--- این خط حتماً باید اضافه شود
sorted_segments = sorted(data.segments, key=lambda x: x.start)
for idx, seg in enumerate(sorted_segments):
start_time = round(max(seg.start, current_timeline), 3)
end_time = round(max(seg.end, start_time + 0.1), 3)
if end_time - start_time < 0.04: continue
# --- پر کردن فاصله خالی (Gap Filling) ---
gap = round(start_time - current_timeline, 3)
if gap > 0.005:
# اگر استایل موزیکال است، فریم‌های متحرک بساز + متن جمله قبلی را نگه دار
if data.style.name == "music_player":
# پیدا کردن متن جمله قبلی
if idx > 0:
prev_seg = sorted_segments[idx-1]
text_to_show = [w.word for w in prev_seg.words] if prev_seg.words else prev_seg.text.split()
else:
text_to_show = [] # اگر هنوز جمله اول شروع نشده، متن خالی باشه
gap_cursor = current_timeline
GAP_FPS = 0.05
while gap_cursor < start_time:
setattr(data.style, 'current_render_time', gap_cursor)
gap_name = f"sub_gap_{data.file_id}_{int(gap_cursor*1000)}.png"
# اینجا به جای []، متن جمله قبلی (text_to_show) را می‌فرستیم
img = create_subtitle_image(text_to_show, -1, data.video_width, data.video_height, data.style)
img.save(os.path.join(temp_dir, gap_name))
f.write(f"file '{gap_name}'\nduration {GAP_FPS:.3f}\n")
gap_cursor += GAP_FPS
last_generated_image = gap_name
current_timeline = start_time
else:
# برای سایر استایل‌ها همان منطق قبلی (تصویر ثابت)
if last_generated_image != "empty.png":
fill_img = last_generated_image
else:
fill_img = "empty.png"
f.write(f"file '{fill_img}'\nduration {gap:.3f}\n")
current_timeline += gap
current_timeline = start_time
available_duration = round(end_time - current_timeline, 3)
words = [w.word for w in seg.words] if seg.words else seg.text.split()
if seg.words and len(words) > 0:
seg.words.sort(key=lambda x: x.start)
words = [w.word for w in seg.words]
# --- منطق استایل موزیکال (فریم به فریم برای روانی حرکت) ---
if data.style.name == "music_player":
SUB_FRAME_DURATION = 0.05
time_cursor = start_time
ANIMATION_DURATION = 0.4
while time_cursor < end_time:
active_word_index = -1
for i, w_info in enumerate(seg.words):
if time_cursor >= w_info.start and time_cursor < w_info.end:
active_word_index = i
break
setattr(data.style, 'current_render_time', time_cursor)
# محاسبه انیمیشن
time_into_segment = time_cursor - start_time
anim_progress = min(1.0, time_into_segment / ANIMATION_DURATION)
setattr(data.style, 'entry_anim_progress', anim_progress)
name = f"sub_{data.file_id}_{idx}_{int(time_cursor*1000)}.png"
img = create_subtitle_image(words, active_word_index, data.video_width, data.video_height, data.style, word_infos=seg.words)
img.save(os.path.join(temp_dir, name))
# *** تغییر ۲: ذخیره نام آخرین عکس ***
last_generated_image = name
f.write(f"file '{name}'\nduration {SUB_FRAME_DURATION:.3f}\n")
time_cursor += SUB_FRAME_DURATION
current_timeline = end_time
else:
# --- منطق سایر استایل‌ها ---
word_files, total_word_raw_duration = [], 0
for i, w_info in enumerate(seg.words):
name = f"sub_{data.file_id}_{idx}_{i}.png"
img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, word_infos=seg.words)
img.save(os.path.join(temp_dir, name))
raw_dur = max(0.04, w_info.end - w_info.start)
word_files.append({"file": name, "dur": raw_dur})
total_word_raw_duration += raw_dur
scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1
accumulated_written = 0.0
for wf in word_files:
final_dur = max(0.01, round(wf["dur"] * scale_factor, 3))
f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n")
accumulated_written += final_dur
# آخرین عکس را نگه می‌داریم (هرچند در استایل‌های دیگر معمولاً گپ سیاه است)
last_generated_image = wf['file']
current_timeline += accumulated_written
else:
# حالت بدون کلمات زمانی (کل سگمنت یکجا)
name = f"sub_{data.file_id}_{idx}_full.png"
img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, word_infos=seg.words)
img.save(os.path.join(temp_dir, name))
f.write(f"file '{name}'\nduration {available_duration:.3f}\n")
last_generated_image = name
current_timeline += available_duration
f.write(f"file 'empty.png'\nduration 30.0\n")
return list_file
def process_render_logic(req: ProcessRequest) -> str:
req.segments = [s for s in req.segments if s.end > s.start]
req.segments.sort(key=lambda x: x.start)
lst = generate_subtitle_video(req, TEMP_DIR)
inp = f"{TEMP_DIR}/{req.file_id}.mp4"
if not os.path.exists(inp): raise Exception("Input video not found")
sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov"
out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4"
cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path]
res1 = subprocess.run(cmd_step1, capture_output=True, text=True)
if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}")
cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out]
res2 = subprocess.run(cmd_step2, capture_output=True, text=True)
if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}")
if os.path.exists(sub_video_path): os.remove(sub_video_path)
return f"/temp/{os.path.basename(out)}"
@app.get("/")
async def index(): return FileResponse("index.html")
# --- New Endpoint for Styles ---
@app.get("/api/styles")
def get_style_definitions():
return {
"styles": style_configs,
"templates": style_templates
}
@app.post("/api/generate-style")
def generate_style_api(req: StylePrompt):
if not API_KEYS: raise HTTPException(500, "API Keys Missing")
for _ in range(3):
try:
genai.configure(api_key=random.choice(API_KEYS))
model = genai.GenerativeModel(MODEL_NAME)
prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90)."""
res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"})
data = json.loads(res.text.replace('```json', '').replace('```', '').strip())
return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))}
except: continue
return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"}
@app.post("/api/upload")
def upload(file: UploadFile = File(...)):
if not API_KEYS: raise HTTPException(500, "API Keys Missing")
fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1]
raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3"
try:
with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f)
subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
w, h, duration = get_video_info(fixed_path)
subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
file_to_send = audio_path
except Exception as e: raise HTTPException(500, f"File Processing Error: {e}")
for _ in range(50):
try:
genai.configure(api_key=random.choice(API_KEYS))
vf = genai.upload_file(path=file_to_send)
while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name)
if vf.state.name == "FAILED": raise Exception("Gemini Failed")
model = genai.GenerativeModel(MODEL_NAME)
prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}"
res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"})
data = json.loads(res.text.replace('```json', '').replace('```', '').strip())
raw_segs = data.get("segments", []); final_segs = []
if not raw_segs: raise Exception("Empty transcript")
seg_cnt = 0
for s in raw_segs:
base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0))
if base_start >= duration: continue
base_end = min(base_end, duration)
if base_end <= base_start: base_end = base_start + 1.0
raw_words = s.get("text", "").split()
if not raw_words: continue
full_dur = base_end - base_start
total_wc = len(raw_words)
if total_wc == 0: continue
for k in range(0, total_wc, 9):
chunk = raw_words[k : k+9]
if not chunk: continue
c_start = round(base_start + (full_dur * (k / total_wc)), 3)
c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3)
c_words = []
chunk_dur = c_end - c_start
for j, w in enumerate(chunk):
w_s = round(c_start + (chunk_dur * j / len(chunk)), 3)
w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3)
c_words.append({
"word": w,
"start": w_s,
"end": w_e,
"highlight": w in s.get("keywords", [])
})
final_segs.append({
"id": seg_cnt,
"start": c_start,
"end": c_end,
"text": " ".join(chunk),
"words": c_words
})
seg_cnt += 1
try: genai.delete_file(vf.name)
except: pass
if os.path.exists(audio_path): os.remove(audio_path)
if os.path.exists(raw_path): os.remove(raw_path)
return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")}
except Exception as e: print(e); continue
raise HTTPException(500, "Failed after 50 attempts")
@app.post("/api/reupload")
async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)):
if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id")
target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4")
try:
with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer)
except Exception as e: raise HTTPException(500, f"Could not save file: {e}")
finally: await file.close()
return {"status": "success", "message": f"File {file_id}.mp4 restored."}
@app.post("/api/enqueue-render")
async def enqueue_render(req: ProcessRequest):
if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")):
return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"})
job_id = str(uuid.uuid4())
jobs_db[job_id] = Job(job_id, req)
await render_queue.put(job_id)
return {"job_id": job_id, "status": JobStatus.QUEUED}
@app.get("/api/job-status/{job_id}")
async def get_job_status(job_id: str):
job = jobs_db.get(job_id)
if not job: raise HTTPException(404, "Job not found")
response = {"job_id": job.id, "status": job.status}
if job.status == JobStatus.QUEUED:
response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1
elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url
elif job.status == JobStatus.FAILED: response["error"] = job.error_message
return response