Zirnavis21 / app.py
Elias207's picture
Update app.py
0507fce verified
import os
import shutil
import subprocess
import uuid
import json
import time
import asyncio
import random
from datetime import datetime
from typing import List, Optional, Union, Dict
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from fastapi.middleware.cors import CORSMiddleware
import google.generativeai as genai
from pydantic import BaseModel
from PIL import Image, ImageDraw, ImageFont
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
TEMP_DIR = "temp"
STATIC_DIR = "static"
os.makedirs(TEMP_DIR, exist_ok=True)
os.makedirs(STATIC_DIR, exist_ok=True)
app.mount("/temp", StaticFiles(directory="temp"), name="temp")
app.mount("/static", StaticFiles(directory="static"), name="static")
MODEL_NAME = "gemini-3-flash-preview"
FONT_DIR = "font"
FONT_FILES_MAP = {
"vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf",
"bangers": "Bangers.ttf", "roboto": "Roboto.ttf"
}
raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "")
API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()]
if not API_KEYS:
single_key = os.getenv("GEMINI_API_KEY")
if single_key: API_KEYS.append(single_key)
print(f"--- تعداد {len(API_KEYS)} کلید جی‌مینای شناسایی شد ---")
class WordInfo(BaseModel):
word: str; start: float; end: float
highlight: Optional[bool] = False
color: Optional[str] = None
class SubtitleSegment(BaseModel):
id: Union[str, int]; start: float; end: float; text: str
words: Optional[List[WordInfo]] = []
class StyleConfig(BaseModel):
font: str; fontSize: int; primaryColor: str; outlineColor: str
backType: str; marginV: int
x: Optional[int] = 0
name: Optional[str] = "classic"
radius: Optional[int] = 16
paddingX: Optional[int] = 20
paddingY: Optional[int] = 10
class ProcessRequest(BaseModel):
file_id: str; segments: List[SubtitleSegment]
video_width: int; video_height: int; style: StyleConfig
class StylePrompt(BaseModel):
description: str
class JobStatus:
QUEUED = "queued"; PROCESSING = "processing"
COMPLETED = "completed"; FAILED = "failed"
class Job:
def __init__(self, job_id: str, request_data: ProcessRequest):
self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED
self.created_at = datetime.now(); self.result_url = None; self.error_message = None
render_queue = asyncio.Queue()
jobs_db: Dict[str, Job] = {}
async def queue_worker():
print("--- Queue Worker Started ---")
while True:
job_id = await render_queue.get()
job = jobs_db.get(job_id)
if job:
try:
print(f"Processing job: {job_id}")
job.status = JobStatus.PROCESSING
output_url = process_render_logic(job.data)
job.result_url = output_url
job.status = JobStatus.COMPLETED
print(f"Job {job_id} completed.")
except Exception as e:
print(f"Job {job_id} failed: {e}")
job.status = JobStatus.FAILED
job.error_message = str(e)
render_queue.task_done()
@app.on_event("startup")
async def startup_event(): asyncio.create_task(queue_worker())
def get_video_info(path):
try:
cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path]
res = subprocess.run(cmd, capture_output=True, text=True)
data = json.loads(res.stdout)
stream = data['streams'][0]
w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration')
if not dur:
cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path]
res_dur = subprocess.run(cmd_dur, capture_output=True, text=True)
data_dur = json.loads(res_dur.stdout)
dur = data_dur['format'].get('duration', 60)
return w, h, float(dur)
except: return 1080, 1920, 60.0
def get_font_object(style_font_name, size):
target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf")
target_path = os.path.join(FONT_DIR, target_filename)
if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf")
if os.path.exists(target_path): return ImageFont.truetype(target_path, size)
return ImageFont.load_default()
def get_color_tuple(color_str: str, default=(255, 255, 255, 255)):
if not color_str or not isinstance(color_str, str): return default
color_str = color_str.strip().lower()
if color_str.startswith('#'):
try:
hex_val = color_str.lstrip('#')
if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,)
elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6))
except: pass
elif color_str.startswith('rgba'):
try:
content = color_str[color_str.find('(')+1 : color_str.rfind(')')]
parts = [x.strip() for x in content.split(',')]
if len(parts) >= 4:
r, g, b = int(parts[0]), int(parts[1]), int(parts[2])
a = int(float(parts[3]) * 255)
return (r, g, b, a)
except: pass
elif color_str.startswith('rgb'):
try:
content = color_str[color_str.find('(')+1 : color_str.rfind(')')]
parts = [x.strip() for x in content.split(',')]
if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255)
except: pass
return default
def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None):
img = Image.new('RGBA', (width, height), (0, 0, 0, 0))
draw = ImageDraw.Draw(img)
font = get_font_object(style.font, style.fontSize)
is_karaoke_style = style.name in ["karaoke_static", "auto_director", "karaoke_purple"]
MAX_WORDS_PER_LINE = 5
# 1. گروه‌بندی کلمات در خطوط ۵ تایی
lines = []
current_line = []
line_word_indices = []
current_line_indices = []
for i, word in enumerate(text_parts):
current_line.append(word)
current_line_indices.append(i)
if len(current_line) == MAX_WORDS_PER_LINE:
lines.append(current_line)
line_word_indices.append(current_line_indices)
current_line = []
current_line_indices = []
if current_line:
lines.append(current_line)
line_word_indices.append(current_line_indices)
space_w = draw.textlength(" ", font=font)
line_metrics = []
max_line_width = 0
for line_words in lines:
w_widths = []
l_width = 0
full_line_text = " ".join(line_words)
try:
l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa')
except:
l_width = font.getlength(full_line_text)
if l_width > max_line_width:
max_line_width = l_width
for w in line_words:
try: wl = draw.textlength(w, font=font, direction='rtl', language='fa')
except: wl = font.getlength(w)
w_widths.append(wl)
line_metrics.append({
"width": l_width,
"words": line_words,
"word_widths": w_widths
})
line_height_px = int(style.fontSize * 1.5)
total_block_height = len(lines) * line_height_px
bottom_reference = height - style.marginV
start_y_of_block = bottom_reference - total_block_height
if not is_karaoke_style and style.name not in ["plain_white", "white_outline"] and style.backType in ['solid', 'transparent']:
ratio = height / width
box_center_y_adjustment = 0
if ratio > 1.6: box_center_y_adjustment = 5
elif ratio > 1.1: box_center_y_adjustment = 10
else: box_center_y_adjustment = 15
box_center_x = width / 2 + style.x
box_width = max_line_width + (style.paddingX * 2)
box_x1 = box_center_x - (box_width / 2)
box_x2 = box_center_x + (box_width / 2)
visual_top_correction = int(style.fontSize * 0.12)
box_y1 = start_y_of_block - style.paddingY + box_center_y_adjustment + visual_top_correction
box_y2 = start_y_of_block + total_block_height + style.paddingY + box_center_y_adjustment - (line_height_px * 0.3)
fill_color_tuple = get_color_tuple(style.outlineColor, (0, 0, 0, 255))
if style.backType == 'transparent' and fill_color_tuple[3] == 255:
fill_color_tuple = (fill_color_tuple[0], fill_color_tuple[1], fill_color_tuple[2], 160)
draw.rounded_rectangle([box_x1, box_y1, box_x2, box_y2], radius=style.radius, fill=fill_color_tuple)
current_line_y = start_y_of_block
for line_idx, metrics in enumerate(line_metrics):
start_x = (width + metrics["width"]) / 2 + style.x
cursor_x = start_x
text_y_pos = current_line_y + (line_height_px * 0.005)
words_to_draw = []
global_indices = line_word_indices[line_idx]
for w_i, word in enumerate(metrics["words"]):
w_len = metrics["word_widths"][w_i]
word_x = cursor_x - w_len
global_idx = global_indices[w_i]
is_active = (global_idx == active_idx)
words_to_draw.append({
"text": word,
"x": word_x,
"y": text_y_pos,
"width": w_len,
"is_active": is_active,
"global_idx": global_idx
})
cursor_x -= (w_len + space_w)
VERTICAL_CORRECTION = int(style.fontSize * 0.22)
for item in words_to_draw:
if item["is_active"] and is_karaoke_style:
pad_x, pad_y = style.paddingX, style.paddingY
box_color = (160, 32, 240, 255)
clean_primary = get_color_tuple(style.primaryColor, (160, 32, 240, 255))
if style.name == "auto_director":
box_color = (0, 215, 255, 255) if item["global_idx"] % 2 == 0 else (255, 0, 128, 255)
elif style.name == "karaoke_static":
box_color = clean_primary
# --- تغییر: حذف کد تغییر رنگ کادر (باکس) بر اساس رنگ اختصاصی ---
# قبلاً اینجا یک بلوک try/except برای override رنگ باکس بود که حذف شد.
rect_y1 = item["y"] - int(pad_y * 0.7) + VERTICAL_CORRECTION
rect_y2 = item["y"] + style.fontSize + int(pad_y * 0.7) + VERTICAL_CORRECTION
draw.rounded_rectangle(
[item["x"] - pad_x, rect_y1, item["x"] + item["width"] + pad_x, rect_y2],
radius=style.radius, fill=box_color
)
for i, item in enumerate(words_to_draw):
if style.name == "progressive_write" and active_idx != -1 and item["global_idx"] > active_idx:
continue
text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), 0
if style.name == "plain_white": text_color = (255,255,255,255)
elif style.name == "white_outline":
text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), max(2, int(style.fontSize / 12))
elif not is_karaoke_style:
text_color = get_color_tuple(style.primaryColor, (255,255,255,255))
stroke_color = get_color_tuple(style.outlineColor, (0,0,0,255))
stroke_width = max(2, int(style.fontSize / 12)) if style.backType == 'outline' else 0
# --- تغییر: تغییر رنگ متن (Text Color) حتی در حالت کارائوکه ---
try:
if word_infos and item["global_idx"] < len(word_infos):
w_obj = word_infos[item["global_idx"]]
if hasattr(w_obj, 'color') and w_obj.color:
# شرط if not is_karaoke_style حذف شد
text_color = get_color_tuple(w_obj.color, text_color)
except: pass
# جابجا کردن متن به سمت بالا برای هماهنگی با کادر (کاهش Y)
draw.text((item["x"], item["y"] - int(style.fontSize * 0.05)), item["text"], font=font, fill=text_color, stroke_width=stroke_width, stroke_fill=stroke_color, direction='rtl', language='fa')
current_line_y += line_height_px
return img
def generate_subtitle_video(data: ProcessRequest, temp_dir: str):
list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt")
empty_img_path = os.path.join(temp_dir, "empty.png")
if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path)
with open(list_file, "w") as f:
is_dynamic = data.style.name in ["karaoke_static", "auto_director", "karaoke_purple", "progressive_write"]
current_timeline = 0.0
sorted_segments = sorted(data.segments, key=lambda x: x.start)
for idx, seg in enumerate(sorted_segments):
start_time = round(max(seg.start, current_timeline), 3)
end_time = round(max(seg.end, start_time + 0.1), 3)
if end_time - start_time < 0.04: continue
gap = round(start_time - current_timeline, 3)
if gap > 0.005:
f.write(f"file 'empty.png'\nduration {gap:.3f}\n")
current_timeline += gap
current_timeline = start_time
available_duration = round(end_time - current_timeline, 3)
words = [w.word for w in seg.words] if seg.words else seg.text.split()
if seg.words and is_dynamic and len(words) > 0:
seg.words.sort(key=lambda x: x.start)
words = [w.word for w in seg.words] # اصلاح: بروزرسانی لیست کلمات پس از مرتب‌سازی
word_files, total_word_raw_duration = [], 0
for i, w_info in enumerate(seg.words):
name = f"sub_{data.file_id}_{idx}_{i}.png"
img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, seg.words)
img.save(os.path.join(temp_dir, name))
raw_dur = max(0.04, w_info.end - w_info.start)
word_files.append({"file": name, "dur": raw_dur})
total_word_raw_duration += raw_dur
scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1
accumulated_written = 0.0
for wf in word_files:
final_dur = max(0.01, round(wf["dur"] * scale_factor, 3))
f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n")
accumulated_written += final_dur
current_timeline += accumulated_written
else:
name = f"sub_{data.file_id}_{idx}_full.png"
img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, seg.words)
img.save(os.path.join(temp_dir, name))
f.write(f"file '{name}'\nduration {available_duration:.3f}\n")
current_timeline += available_duration
remaining_in_segment = round(end_time - current_timeline, 3)
if remaining_in_segment > 0.005:
last_used = f"sub_{data.file_id}_{idx}_{len(words)-1}.png" if (seg.words and is_dynamic and len(words)>0) else f"sub_{data.file_id}_{idx}_full.png"
f.write(f"file '{last_used}'\nduration {remaining_in_segment:.3f}\n")
current_timeline += remaining_in_segment
f.write(f"file 'empty.png'\nduration 30.0\n")
return list_file
def process_render_logic(req: ProcessRequest) -> str:
req.segments = [s for s in req.segments if s.end > s.start]
req.segments.sort(key=lambda x: x.start)
lst = generate_subtitle_video(req, TEMP_DIR)
inp = f"{TEMP_DIR}/{req.file_id}.mp4"
if not os.path.exists(inp): raise Exception("Input video not found")
sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov"
out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4"
cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path]
res1 = subprocess.run(cmd_step1, capture_output=True, text=True)
if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}")
cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out]
res2 = subprocess.run(cmd_step2, capture_output=True, text=True)
if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}")
if os.path.exists(sub_video_path): os.remove(sub_video_path)
return f"/temp/{os.path.basename(out)}"
@app.get("/")
async def index(): return FileResponse("index.html")
@app.post("/api/generate-style")
def generate_style_api(req: StylePrompt):
if not API_KEYS: raise HTTPException(500, "API Keys Missing")
for _ in range(3):
try:
genai.configure(api_key=random.choice(API_KEYS))
model = genai.GenerativeModel(MODEL_NAME)
prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90)."""
res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"})
data = json.loads(res.text.replace('```json', '').replace('```', '').strip())
return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))}
except: continue
return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"}
@app.post("/api/upload")
def upload(file: UploadFile = File(...)):
if not API_KEYS: raise HTTPException(500, "API Keys Missing")
fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1]
raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3"
try:
with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f)
subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
w, h, duration = get_video_info(fixed_path)
subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
file_to_send = audio_path
except Exception as e: raise HTTPException(500, f"File Processing Error: {e}")
for _ in range(50):
try:
genai.configure(api_key=random.choice(API_KEYS))
vf = genai.upload_file(path=file_to_send)
while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name)
if vf.state.name == "FAILED": raise Exception("Gemini Failed")
model = genai.GenerativeModel(MODEL_NAME)
prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}"
res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"})
# --- شروع بخش اصلاح شده ---
data = json.loads(res.text.replace('```json', '').replace('```', '').strip())
raw_segs = data.get("segments", []); final_segs = []
if not raw_segs: raise Exception("Empty transcript")
seg_cnt = 0
for s in raw_segs:
base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0))
if base_start >= duration: continue
base_end = min(base_end, duration)
if base_end <= base_start: base_end = base_start + 1.0
raw_words = s.get("text", "").split()
if not raw_words: continue
full_dur = base_end - base_start
total_wc = len(raw_words)
if total_wc == 0: continue
# تقسیم‌بندی کلمات به دسته‌های ۱۰ تایی
for k in range(0, total_wc, 9):
chunk = raw_words[k : k+9]
if not chunk: continue
c_start = round(base_start + (full_dur * (k / total_wc)), 3)
c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3)
c_words = []
chunk_dur = c_end - c_start
for j, w in enumerate(chunk):
w_s = round(c_start + (chunk_dur * j / len(chunk)), 3)
w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3)
c_words.append({
"word": w,
"start": w_s,
"end": w_e,
"highlight": w in s.get("keywords", [])
})
final_segs.append({
"id": seg_cnt,
"start": c_start,
"end": c_end,
"text": " ".join(chunk),
"words": c_words
})
seg_cnt += 1
# --- پایان بخش اصلاح شده ---
try: genai.delete_file(vf.name)
except: pass
if os.path.exists(audio_path): os.remove(audio_path)
if os.path.exists(raw_path): os.remove(raw_path)
return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")}
except Exception as e: print(e); continue
raise HTTPException(500, "Failed after 50 attempts")
@app.post("/api/reupload")
async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)):
if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id")
target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4")
try:
with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer)
except Exception as e: raise HTTPException(500, f"Could not save file: {e}")
finally: await file.close()
return {"status": "success", "message": f"File {file_id}.mp4 restored."}
@app.post("/api/enqueue-render")
async def enqueue_render(req: ProcessRequest):
if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")):
return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"})
job_id = str(uuid.uuid4())
jobs_db[job_id] = Job(job_id, req)
await render_queue.put(job_id)
return {"job_id": job_id, "status": JobStatus.QUEUED}
@app.get("/api/job-status/{job_id}")
async def get_job_status(job_id: str):
job = jobs_db.get(job_id)
if not job: raise HTTPException(404, "Job not found")
response = {"job_id": job.id, "status": job.status}
if job.status == JobStatus.QUEUED:
response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1
elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url
elif job.status == JobStatus.FAILED: response["error"] = job.error_message
return response