import os import shutil import subprocess import uuid import json import time import asyncio import random from datetime import datetime from typing import List, Optional, Union, Dict from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import google.generativeai as genai from pydantic import BaseModel from PIL import Image, ImageDraw, ImageFont app = FastAPI() app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) TEMP_DIR = "temp" STATIC_DIR = "static" os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(STATIC_DIR, exist_ok=True) app.mount("/temp", StaticFiles(directory="temp"), name="temp") app.mount("/static", StaticFiles(directory="static"), name="static") MODEL_NAME = "gemini-3-flash-preview" FONT_DIR = "font" FONT_FILES_MAP = { "vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf", "bangers": "Bangers.ttf", "roboto": "Roboto.ttf" } raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "") API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()] if not API_KEYS: single_key = os.getenv("GEMINI_API_KEY") if single_key: API_KEYS.append(single_key) print(f"--- تعداد {len(API_KEYS)} کلید جی‌مینای شناسایی شد ---") class WordInfo(BaseModel): word: str; start: float; end: float highlight: Optional[bool] = False color: Optional[str] = None class SubtitleSegment(BaseModel): id: Union[str, int]; start: float; end: float; text: str words: Optional[List[WordInfo]] = [] class StyleConfig(BaseModel): font: str; fontSize: int; primaryColor: str; outlineColor: str backType: str; marginV: int x: Optional[int] = 0 name: Optional[str] = "classic" radius: Optional[int] = 16 paddingX: Optional[int] = 20 paddingY: Optional[int] = 10 class ProcessRequest(BaseModel): file_id: str; segments: List[SubtitleSegment] video_width: int; video_height: int; style: StyleConfig class StylePrompt(BaseModel): description: str class JobStatus: QUEUED = "queued"; PROCESSING = "processing" COMPLETED = "completed"; FAILED = "failed" class Job: def __init__(self, job_id: str, request_data: ProcessRequest): self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED self.created_at = datetime.now(); self.result_url = None; self.error_message = None render_queue = asyncio.Queue() jobs_db: Dict[str, Job] = {} async def queue_worker(): print("--- Queue Worker Started ---") while True: job_id = await render_queue.get() job = jobs_db.get(job_id) if job: try: print(f"Processing job: {job_id}") job.status = JobStatus.PROCESSING output_url = process_render_logic(job.data) job.result_url = output_url job.status = JobStatus.COMPLETED print(f"Job {job_id} completed.") except Exception as e: print(f"Job {job_id} failed: {e}") job.status = JobStatus.FAILED job.error_message = str(e) render_queue.task_done() @app.on_event("startup") async def startup_event(): asyncio.create_task(queue_worker()) def get_video_info(path): try: cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path] res = subprocess.run(cmd, capture_output=True, text=True) data = json.loads(res.stdout) stream = data['streams'][0] w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration') if not dur: cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path] res_dur = subprocess.run(cmd_dur, capture_output=True, text=True) data_dur = json.loads(res_dur.stdout) dur = data_dur['format'].get('duration', 60) return w, h, float(dur) except: return 1080, 1920, 60.0 def get_font_object(style_font_name, size): target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf") target_path = os.path.join(FONT_DIR, target_filename) if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf") if os.path.exists(target_path): return ImageFont.truetype(target_path, size) return ImageFont.load_default() def get_color_tuple(color_str: str, default=(255, 255, 255, 255)): if not color_str or not isinstance(color_str, str): return default color_str = color_str.strip().lower() if color_str.startswith('#'): try: hex_val = color_str.lstrip('#') if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,) elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6)) except: pass elif color_str.startswith('rgba'): try: content = color_str[color_str.find('(')+1 : color_str.rfind(')')] parts = [x.strip() for x in content.split(',')] if len(parts) >= 4: r, g, b = int(parts[0]), int(parts[1]), int(parts[2]) a = int(float(parts[3]) * 255) return (r, g, b, a) except: pass elif color_str.startswith('rgb'): try: content = color_str[color_str.find('(')+1 : color_str.rfind(')')] parts = [x.strip() for x in content.split(',')] if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255) except: pass return default def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None): img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) draw = ImageDraw.Draw(img) font = get_font_object(style.font, style.fontSize) is_karaoke_style = style.name in ["karaoke_static", "auto_director", "karaoke_purple"] MAX_WORDS_PER_LINE = 5 # 1. گروه‌بندی کلمات در خطوط ۵ تایی lines = [] current_line = [] line_word_indices = [] current_line_indices = [] for i, word in enumerate(text_parts): current_line.append(word) current_line_indices.append(i) if len(current_line) == MAX_WORDS_PER_LINE: lines.append(current_line) line_word_indices.append(current_line_indices) current_line = [] current_line_indices = [] if current_line: lines.append(current_line) line_word_indices.append(current_line_indices) space_w = draw.textlength(" ", font=font) line_metrics = [] max_line_width = 0 for line_words in lines: w_widths = [] l_width = 0 full_line_text = " ".join(line_words) try: l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa') except: l_width = font.getlength(full_line_text) if l_width > max_line_width: max_line_width = l_width for w in line_words: try: wl = draw.textlength(w, font=font, direction='rtl', language='fa') except: wl = font.getlength(w) w_widths.append(wl) line_metrics.append({ "width": l_width, "words": line_words, "word_widths": w_widths }) line_height_px = int(style.fontSize * 1.5) total_block_height = len(lines) * line_height_px bottom_reference = height - style.marginV start_y_of_block = bottom_reference - total_block_height if not is_karaoke_style and style.name not in ["plain_white", "white_outline"] and style.backType in ['solid', 'transparent']: ratio = height / width box_center_y_adjustment = 0 if ratio > 1.6: box_center_y_adjustment = 5 elif ratio > 1.1: box_center_y_adjustment = 10 else: box_center_y_adjustment = 15 box_center_x = width / 2 + style.x box_width = max_line_width + (style.paddingX * 2) box_x1 = box_center_x - (box_width / 2) box_x2 = box_center_x + (box_width / 2) visual_top_correction = int(style.fontSize * 0.12) box_y1 = start_y_of_block - style.paddingY + box_center_y_adjustment + visual_top_correction box_y2 = start_y_of_block + total_block_height + style.paddingY + box_center_y_adjustment - (line_height_px * 0.3) fill_color_tuple = get_color_tuple(style.outlineColor, (0, 0, 0, 255)) if style.backType == 'transparent' and fill_color_tuple[3] == 255: fill_color_tuple = (fill_color_tuple[0], fill_color_tuple[1], fill_color_tuple[2], 160) draw.rounded_rectangle([box_x1, box_y1, box_x2, box_y2], radius=style.radius, fill=fill_color_tuple) current_line_y = start_y_of_block for line_idx, metrics in enumerate(line_metrics): start_x = (width + metrics["width"]) / 2 + style.x cursor_x = start_x text_y_pos = current_line_y + (line_height_px * 0.005) words_to_draw = [] global_indices = line_word_indices[line_idx] for w_i, word in enumerate(metrics["words"]): w_len = metrics["word_widths"][w_i] word_x = cursor_x - w_len global_idx = global_indices[w_i] is_active = (global_idx == active_idx) words_to_draw.append({ "text": word, "x": word_x, "y": text_y_pos, "width": w_len, "is_active": is_active, "global_idx": global_idx }) cursor_x -= (w_len + space_w) VERTICAL_CORRECTION = int(style.fontSize * 0.22) for item in words_to_draw: if item["is_active"] and is_karaoke_style: pad_x, pad_y = style.paddingX, style.paddingY box_color = (160, 32, 240, 255) clean_primary = get_color_tuple(style.primaryColor, (160, 32, 240, 255)) if style.name == "auto_director": box_color = (0, 215, 255, 255) if item["global_idx"] % 2 == 0 else (255, 0, 128, 255) elif style.name == "karaoke_static": box_color = clean_primary # --- تغییر: حذف کد تغییر رنگ کادر (باکس) بر اساس رنگ اختصاصی --- # قبلاً اینجا یک بلوک try/except برای override رنگ باکس بود که حذف شد. rect_y1 = item["y"] - int(pad_y * 0.7) + VERTICAL_CORRECTION rect_y2 = item["y"] + style.fontSize + int(pad_y * 0.7) + VERTICAL_CORRECTION draw.rounded_rectangle( [item["x"] - pad_x, rect_y1, item["x"] + item["width"] + pad_x, rect_y2], radius=style.radius, fill=box_color ) for i, item in enumerate(words_to_draw): if style.name == "progressive_write" and active_idx != -1 and item["global_idx"] > active_idx: continue text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), 0 if style.name == "plain_white": text_color = (255,255,255,255) elif style.name == "white_outline": text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), max(2, int(style.fontSize / 12)) elif not is_karaoke_style: text_color = get_color_tuple(style.primaryColor, (255,255,255,255)) stroke_color = get_color_tuple(style.outlineColor, (0,0,0,255)) stroke_width = max(2, int(style.fontSize / 12)) if style.backType == 'outline' else 0 # --- تغییر: تغییر رنگ متن (Text Color) حتی در حالت کارائوکه --- try: if word_infos and item["global_idx"] < len(word_infos): w_obj = word_infos[item["global_idx"]] if hasattr(w_obj, 'color') and w_obj.color: # شرط if not is_karaoke_style حذف شد text_color = get_color_tuple(w_obj.color, text_color) except: pass # جابجا کردن متن به سمت بالا برای هماهنگی با کادر (کاهش Y) draw.text((item["x"], item["y"] - int(style.fontSize * 0.05)), item["text"], font=font, fill=text_color, stroke_width=stroke_width, stroke_fill=stroke_color, direction='rtl', language='fa') current_line_y += line_height_px return img def generate_subtitle_video(data: ProcessRequest, temp_dir: str): list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt") empty_img_path = os.path.join(temp_dir, "empty.png") if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path) with open(list_file, "w") as f: is_dynamic = data.style.name in ["karaoke_static", "auto_director", "karaoke_purple", "progressive_write"] current_timeline = 0.0 sorted_segments = sorted(data.segments, key=lambda x: x.start) for idx, seg in enumerate(sorted_segments): start_time = round(max(seg.start, current_timeline), 3) end_time = round(max(seg.end, start_time + 0.1), 3) if end_time - start_time < 0.04: continue gap = round(start_time - current_timeline, 3) if gap > 0.005: f.write(f"file 'empty.png'\nduration {gap:.3f}\n") current_timeline += gap current_timeline = start_time available_duration = round(end_time - current_timeline, 3) words = [w.word for w in seg.words] if seg.words else seg.text.split() if seg.words and is_dynamic and len(words) > 0: seg.words.sort(key=lambda x: x.start) words = [w.word for w in seg.words] # اصلاح: بروزرسانی لیست کلمات پس از مرتب‌سازی word_files, total_word_raw_duration = [], 0 for i, w_info in enumerate(seg.words): name = f"sub_{data.file_id}_{idx}_{i}.png" img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, seg.words) img.save(os.path.join(temp_dir, name)) raw_dur = max(0.04, w_info.end - w_info.start) word_files.append({"file": name, "dur": raw_dur}) total_word_raw_duration += raw_dur scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1 accumulated_written = 0.0 for wf in word_files: final_dur = max(0.01, round(wf["dur"] * scale_factor, 3)) f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n") accumulated_written += final_dur current_timeline += accumulated_written else: name = f"sub_{data.file_id}_{idx}_full.png" img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, seg.words) img.save(os.path.join(temp_dir, name)) f.write(f"file '{name}'\nduration {available_duration:.3f}\n") current_timeline += available_duration remaining_in_segment = round(end_time - current_timeline, 3) if remaining_in_segment > 0.005: last_used = f"sub_{data.file_id}_{idx}_{len(words)-1}.png" if (seg.words and is_dynamic and len(words)>0) else f"sub_{data.file_id}_{idx}_full.png" f.write(f"file '{last_used}'\nduration {remaining_in_segment:.3f}\n") current_timeline += remaining_in_segment f.write(f"file 'empty.png'\nduration 30.0\n") return list_file def process_render_logic(req: ProcessRequest) -> str: req.segments = [s for s in req.segments if s.end > s.start] req.segments.sort(key=lambda x: x.start) lst = generate_subtitle_video(req, TEMP_DIR) inp = f"{TEMP_DIR}/{req.file_id}.mp4" if not os.path.exists(inp): raise Exception("Input video not found") sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov" out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4" cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path] res1 = subprocess.run(cmd_step1, capture_output=True, text=True) if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}") cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out] res2 = subprocess.run(cmd_step2, capture_output=True, text=True) if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}") if os.path.exists(sub_video_path): os.remove(sub_video_path) return f"/temp/{os.path.basename(out)}" @app.get("/") async def index(): return FileResponse("index.html") @app.post("/api/generate-style") def generate_style_api(req: StylePrompt): if not API_KEYS: raise HTTPException(500, "API Keys Missing") for _ in range(3): try: genai.configure(api_key=random.choice(API_KEYS)) model = genai.GenerativeModel(MODEL_NAME) prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90).""" res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"}) data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))} except: continue return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"} @app.post("/api/upload") def upload(file: UploadFile = File(...)): if not API_KEYS: raise HTTPException(500, "API Keys Missing") fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1] raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3" try: with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f) subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) w, h, duration = get_video_info(fixed_path) subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) file_to_send = audio_path except Exception as e: raise HTTPException(500, f"File Processing Error: {e}") for _ in range(50): try: genai.configure(api_key=random.choice(API_KEYS)) vf = genai.upload_file(path=file_to_send) while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name) if vf.state.name == "FAILED": raise Exception("Gemini Failed") model = genai.GenerativeModel(MODEL_NAME) prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}" res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"}) # --- شروع بخش اصلاح شده --- data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) raw_segs = data.get("segments", []); final_segs = [] if not raw_segs: raise Exception("Empty transcript") seg_cnt = 0 for s in raw_segs: base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0)) if base_start >= duration: continue base_end = min(base_end, duration) if base_end <= base_start: base_end = base_start + 1.0 raw_words = s.get("text", "").split() if not raw_words: continue full_dur = base_end - base_start total_wc = len(raw_words) if total_wc == 0: continue # تقسیم‌بندی کلمات به دسته‌های ۱۰ تایی for k in range(0, total_wc, 9): chunk = raw_words[k : k+9] if not chunk: continue c_start = round(base_start + (full_dur * (k / total_wc)), 3) c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3) c_words = [] chunk_dur = c_end - c_start for j, w in enumerate(chunk): w_s = round(c_start + (chunk_dur * j / len(chunk)), 3) w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3) c_words.append({ "word": w, "start": w_s, "end": w_e, "highlight": w in s.get("keywords", []) }) final_segs.append({ "id": seg_cnt, "start": c_start, "end": c_end, "text": " ".join(chunk), "words": c_words }) seg_cnt += 1 # --- پایان بخش اصلاح شده --- try: genai.delete_file(vf.name) except: pass if os.path.exists(audio_path): os.remove(audio_path) if os.path.exists(raw_path): os.remove(raw_path) return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")} except Exception as e: print(e); continue raise HTTPException(500, "Failed after 50 attempts") @app.post("/api/reupload") async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)): if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id") target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4") try: with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) except Exception as e: raise HTTPException(500, f"Could not save file: {e}") finally: await file.close() return {"status": "success", "message": f"File {file_id}.mp4 restored."} @app.post("/api/enqueue-render") async def enqueue_render(req: ProcessRequest): if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")): return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"}) job_id = str(uuid.uuid4()) jobs_db[job_id] = Job(job_id, req) await render_queue.put(job_id) return {"job_id": job_id, "status": JobStatus.QUEUED} @app.get("/api/job-status/{job_id}") async def get_job_status(job_id: str): job = jobs_db.get(job_id) if not job: raise HTTPException(404, "Job not found") response = {"job_id": job.id, "status": job.status} if job.status == JobStatus.QUEUED: response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1 elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url elif job.status == JobStatus.FAILED: response["error"] = job.error_message return response