Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import subprocess | |
| import uuid | |
| import json | |
| import time | |
| import asyncio | |
| import random | |
| import importlib.util | |
| from datetime import datetime | |
| from typing import List, Optional, Union, Dict | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import google.generativeai as genai | |
| from pydantic import BaseModel | |
| from PIL import Image, ImageDraw, ImageFont | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| TEMP_DIR = "temp" | |
| STATIC_DIR = "static" | |
| STYLES_DIR = "styles" | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| os.makedirs(STATIC_DIR, exist_ok=True) | |
| os.makedirs(STYLES_DIR, exist_ok=True) | |
| app.mount("/temp", StaticFiles(directory="temp"), name="temp") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| MODEL_NAME = "gemini-2.5-flash" | |
| FONT_DIR = "font" | |
| FONT_FILES_MAP = { | |
| "vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf", | |
| "bangers": "Bangers.ttf", "roboto": "Vazirmatn-Regular.ttf" | |
| } | |
| # --- Dynamic Style Loading System --- | |
| loaded_styles = {} # Map ID -> Module | |
| style_configs = {} # Map ID -> Config Dict | |
| style_templates = {} # Map ID -> Frontend Template String | |
| def load_all_styles(): | |
| print("--- Loading Styles from /styles ---") | |
| for filename in os.listdir(STYLES_DIR): | |
| if filename.endswith(".py") and filename != "__init__.py": | |
| module_name = filename[:-3] | |
| file_path = os.path.join(STYLES_DIR, filename) | |
| spec = importlib.util.spec_from_file_location(module_name, file_path) | |
| if spec and spec.loader: | |
| mod = importlib.util.module_from_spec(spec) | |
| spec.loader.exec_module(mod) | |
| if hasattr(mod, 'config'): | |
| ids = mod.config.get("ids", []) | |
| for style_id in ids: | |
| loaded_styles[style_id] = mod | |
| style_configs[style_id] = mod.config | |
| if hasattr(mod, 'frontend_template'): | |
| style_templates[style_id] = mod.frontend_template.strip() | |
| print(f"Loaded Style: {style_id}") | |
| # Load styles on startup | |
| load_all_styles() | |
| raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "") | |
| API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()] | |
| if not API_KEYS: | |
| single_key = os.getenv("GEMINI_API_KEY") | |
| if single_key: API_KEYS.append(single_key) | |
| print(f"--- {len(API_KEYS)} Gemini Keys Detected ---") | |
| class WordInfo(BaseModel): | |
| word: str; start: float; end: float | |
| highlight: Optional[bool] = False | |
| color: Optional[str] = None | |
| class SubtitleSegment(BaseModel): | |
| id: Union[str, int]; start: float; end: float; text: str | |
| words: Optional[List[WordInfo]] = [] | |
| class StyleConfig(BaseModel): | |
| font: str; fontSize: int; primaryColor: str; outlineColor: str | |
| backType: str; marginV: int | |
| x: Optional[int] = 0 | |
| name: Optional[str] = "classic" | |
| radius: Optional[int] = 16 | |
| paddingX: Optional[int] = 20 | |
| paddingY: Optional[int] = 10 | |
| total_video_duration: Optional[float] = None | |
| current_render_time: Optional[float] = None | |
| entry_anim_progress: Optional[float] = 1.0 | |
| styleBgColors: Dict[str, str] = {} # <--- این خط اضافه شود | |
| styleColors: Dict[str, str] = {} | |
| styleActiveColors: Dict[str, str] = {} | |
| class ProcessRequest(BaseModel): | |
| file_id: str; segments: List[SubtitleSegment] | |
| video_width: int; video_height: int; style: StyleConfig | |
| class StylePrompt(BaseModel): | |
| description: str | |
| class JobStatus: | |
| QUEUED = "queued"; PROCESSING = "processing" | |
| COMPLETED = "completed"; FAILED = "failed" | |
| class Job: | |
| def __init__(self, job_id: str, request_data: ProcessRequest): | |
| self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED | |
| self.created_at = datetime.now(); self.result_url = None; self.error_message = None | |
| render_queue = asyncio.Queue() | |
| jobs_db: Dict[str, Job] = {} | |
| async def queue_worker(): | |
| print("--- Queue Worker Started ---") | |
| while True: | |
| job_id = await render_queue.get() | |
| job = jobs_db.get(job_id) | |
| if job: | |
| try: | |
| print(f"Processing job: {job_id}") | |
| job.status = JobStatus.PROCESSING | |
| output_url = process_render_logic(job.data) | |
| job.result_url = output_url | |
| job.status = JobStatus.COMPLETED | |
| print(f"Job {job_id} completed.") | |
| except Exception as e: | |
| print(f"Job {job_id} failed: {e}") | |
| job.status = JobStatus.FAILED | |
| job.error_message = str(e) | |
| render_queue.task_done() | |
| async def startup_event(): asyncio.create_task(queue_worker()) | |
| def get_video_info(path): | |
| try: | |
| cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path] | |
| res = subprocess.run(cmd, capture_output=True, text=True) | |
| data = json.loads(res.stdout) | |
| stream = data['streams'][0] | |
| w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration') | |
| if not dur: | |
| cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path] | |
| res_dur = subprocess.run(cmd_dur, capture_output=True, text=True) | |
| data_dur = json.loads(res_dur.stdout) | |
| dur = data_dur['format'].get('duration', 60) | |
| return w, h, float(dur) | |
| except: return 1080, 1920, 60.0 | |
| def get_font_object(style_font_name, size): | |
| target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf") | |
| target_path = os.path.join(FONT_DIR, target_filename) | |
| if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf") | |
| if os.path.exists(target_path): return ImageFont.truetype(target_path, size) | |
| return ImageFont.load_default() | |
| def get_color_tuple(color_str: str, default=(255, 255, 255, 255)): | |
| if not color_str or not isinstance(color_str, str): return default | |
| color_str = color_str.strip().lower() | |
| if color_str.startswith('#'): | |
| try: | |
| hex_val = color_str.lstrip('#') | |
| if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,) | |
| elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6)) | |
| except: pass | |
| elif color_str.startswith('rgba'): | |
| try: | |
| content = color_str[color_str.find('(')+1 : color_str.rfind(')')] | |
| parts = [x.strip() for x in content.split(',')] | |
| if len(parts) >= 4: | |
| r, g, b = int(parts[0]), int(parts[1]), int(parts[2]) | |
| a = int(float(parts[3]) * 255) | |
| return (r, g, b, a) | |
| except: pass | |
| elif color_str.startswith('rgb'): | |
| try: | |
| content = color_str[color_str.find('(')+1 : color_str.rfind(')')] | |
| parts = [x.strip() for x in content.split(',')] | |
| if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255) | |
| except: pass | |
| return default | |
| # --- Main Drawing Function (Refactored) --- | |
| def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None): | |
| img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| font = get_font_object(style.font, style.fontSize) | |
| # Text Wrapping Logic | |
| lines = [] | |
| # اگر استایل موزیکال بود، همه کلمات در یک خط باشند (بدون محدودیت) | |
| if style.name == "music_player": | |
| lines.append(text_parts) | |
| else: | |
| # برای بقیه استایلها: محدودیت 5 کلمه | |
| MAX_WORDS_PER_LINE = 5 | |
| current_line = [] | |
| for i, word in enumerate(text_parts): | |
| current_line.append(word) | |
| if len(current_line) == MAX_WORDS_PER_LINE: | |
| lines.append(current_line) | |
| current_line = [] | |
| if current_line: | |
| lines.append(current_line) | |
| # Pre-calculate line metrics (width, etc) | |
| line_metrics = [] | |
| max_line_width = 0 | |
| for line_words in lines: | |
| w_widths = [] | |
| l_width = 0 | |
| full_line_text = " ".join(line_words) | |
| try: l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa') | |
| except: l_width = font.getlength(full_line_text) | |
| if l_width > max_line_width: max_line_width = l_width | |
| # We also need individual word widths for the styles | |
| for w in line_words: | |
| try: wl = draw.textlength(w, font=font, direction='rtl', language='fa') | |
| except: wl = font.getlength(w) | |
| w_widths.append(wl) | |
| line_metrics.append({"width": l_width, "words": line_words, "word_widths": w_widths}) | |
| # --- Delegate to Style Module --- | |
| style_module = loaded_styles.get(style.name) | |
| # ... بخشی از کد ... | |
| if style_module and hasattr(style_module, 'draw_frame'): | |
| style_module.draw_frame( | |
| draw=draw, | |
| img=img, | |
| width=width, | |
| height=height, | |
| style_config=style, | |
| lines=lines, | |
| line_metrics=line_metrics, | |
| active_idx=active_idx, | |
| font=font, | |
| color_parser=get_color_tuple, | |
| word_infos=word_infos # <--- این خط اضافه شد | |
| ) | |
| else: | |
| # Fallback if style not found (e.g. use classic logic inline or default) | |
| print(f"Warning: Style {style.name} not found, using default.") | |
| # Simple fallback text | |
| y = height - style.marginV | |
| draw.text((width/2, y), "Style Error", font=font, fill="red") | |
| return img | |
| def generate_subtitle_video(data: ProcessRequest, temp_dir: str): | |
| list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt") | |
| empty_img_path = os.path.join(temp_dir, "empty.png") | |
| if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path) | |
| # --- محاسبه زمان کل ویدیو --- | |
| sorted_segments = sorted(data.segments, key=lambda x: x.start) | |
| if sorted_segments: | |
| setattr(data.style, 'total_video_duration', sorted_segments[-1].end) | |
| else: | |
| setattr(data.style, 'total_video_duration', 1.0) | |
| with open(list_file, "w") as f: | |
| current_timeline = 0.0 | |
| last_generated_image = "empty.png" # <--- این خط حتماً باید اضافه شود | |
| sorted_segments = sorted(data.segments, key=lambda x: x.start) | |
| for idx, seg in enumerate(sorted_segments): | |
| start_time = round(max(seg.start, current_timeline), 3) | |
| end_time = round(max(seg.end, start_time + 0.1), 3) | |
| if end_time - start_time < 0.04: continue | |
| # --- پر کردن فاصله خالی (Gap Filling) --- | |
| gap = round(start_time - current_timeline, 3) | |
| if gap > 0.005: | |
| # اگر استایل موزیکال است، فریمهای متحرک بساز + متن جمله قبلی را نگه دار | |
| if data.style.name == "music_player": | |
| # پیدا کردن متن جمله قبلی | |
| if idx > 0: | |
| prev_seg = sorted_segments[idx-1] | |
| text_to_show = [w.word for w in prev_seg.words] if prev_seg.words else prev_seg.text.split() | |
| else: | |
| text_to_show = [] # اگر هنوز جمله اول شروع نشده، متن خالی باشه | |
| gap_cursor = current_timeline | |
| GAP_FPS = 0.05 | |
| while gap_cursor < start_time: | |
| setattr(data.style, 'current_render_time', gap_cursor) | |
| gap_name = f"sub_gap_{data.file_id}_{int(gap_cursor*1000)}.png" | |
| # اینجا به جای []، متن جمله قبلی (text_to_show) را میفرستیم | |
| img = create_subtitle_image(text_to_show, -1, data.video_width, data.video_height, data.style) | |
| img.save(os.path.join(temp_dir, gap_name)) | |
| f.write(f"file '{gap_name}'\nduration {GAP_FPS:.3f}\n") | |
| gap_cursor += GAP_FPS | |
| last_generated_image = gap_name | |
| current_timeline = start_time | |
| else: | |
| # برای سایر استایلها همان منطق قبلی (تصویر ثابت) | |
| if last_generated_image != "empty.png": | |
| fill_img = last_generated_image | |
| else: | |
| fill_img = "empty.png" | |
| f.write(f"file '{fill_img}'\nduration {gap:.3f}\n") | |
| current_timeline += gap | |
| current_timeline = start_time | |
| available_duration = round(end_time - current_timeline, 3) | |
| words = [w.word for w in seg.words] if seg.words else seg.text.split() | |
| if seg.words and len(words) > 0: | |
| seg.words.sort(key=lambda x: x.start) | |
| words = [w.word for w in seg.words] | |
| # --- منطق استایل موزیکال (فریم به فریم برای روانی حرکت) --- | |
| if data.style.name == "music_player": | |
| SUB_FRAME_DURATION = 0.05 | |
| time_cursor = start_time | |
| ANIMATION_DURATION = 0.4 | |
| while time_cursor < end_time: | |
| active_word_index = -1 | |
| for i, w_info in enumerate(seg.words): | |
| if time_cursor >= w_info.start and time_cursor < w_info.end: | |
| active_word_index = i | |
| break | |
| setattr(data.style, 'current_render_time', time_cursor) | |
| # محاسبه انیمیشن | |
| time_into_segment = time_cursor - start_time | |
| anim_progress = min(1.0, time_into_segment / ANIMATION_DURATION) | |
| setattr(data.style, 'entry_anim_progress', anim_progress) | |
| name = f"sub_{data.file_id}_{idx}_{int(time_cursor*1000)}.png" | |
| img = create_subtitle_image(words, active_word_index, data.video_width, data.video_height, data.style, word_infos=seg.words) | |
| img.save(os.path.join(temp_dir, name)) | |
| # *** تغییر ۲: ذخیره نام آخرین عکس *** | |
| last_generated_image = name | |
| f.write(f"file '{name}'\nduration {SUB_FRAME_DURATION:.3f}\n") | |
| time_cursor += SUB_FRAME_DURATION | |
| current_timeline = end_time | |
| else: | |
| # --- منطق سایر استایلها --- | |
| word_files, total_word_raw_duration = [], 0 | |
| for i, w_info in enumerate(seg.words): | |
| name = f"sub_{data.file_id}_{idx}_{i}.png" | |
| img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, word_infos=seg.words) | |
| img.save(os.path.join(temp_dir, name)) | |
| raw_dur = max(0.04, w_info.end - w_info.start) | |
| word_files.append({"file": name, "dur": raw_dur}) | |
| total_word_raw_duration += raw_dur | |
| scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1 | |
| accumulated_written = 0.0 | |
| for wf in word_files: | |
| final_dur = max(0.01, round(wf["dur"] * scale_factor, 3)) | |
| f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n") | |
| accumulated_written += final_dur | |
| # آخرین عکس را نگه میداریم (هرچند در استایلهای دیگر معمولاً گپ سیاه است) | |
| last_generated_image = wf['file'] | |
| current_timeline += accumulated_written | |
| else: | |
| # حالت بدون کلمات زمانی (کل سگمنت یکجا) | |
| name = f"sub_{data.file_id}_{idx}_full.png" | |
| img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, word_infos=seg.words) | |
| img.save(os.path.join(temp_dir, name)) | |
| f.write(f"file '{name}'\nduration {available_duration:.3f}\n") | |
| last_generated_image = name | |
| current_timeline += available_duration | |
| f.write(f"file 'empty.png'\nduration 30.0\n") | |
| return list_file | |
| def process_render_logic(req: ProcessRequest) -> str: | |
| req.segments = [s for s in req.segments if s.end > s.start] | |
| req.segments.sort(key=lambda x: x.start) | |
| lst = generate_subtitle_video(req, TEMP_DIR) | |
| inp = f"{TEMP_DIR}/{req.file_id}.mp4" | |
| if not os.path.exists(inp): raise Exception("Input video not found") | |
| sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov" | |
| out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4" | |
| cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path] | |
| res1 = subprocess.run(cmd_step1, capture_output=True, text=True) | |
| if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}") | |
| cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out] | |
| res2 = subprocess.run(cmd_step2, capture_output=True, text=True) | |
| if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}") | |
| if os.path.exists(sub_video_path): os.remove(sub_video_path) | |
| return f"/temp/{os.path.basename(out)}" | |
| async def index(): return FileResponse("index.html") | |
| # --- New Endpoint for Styles --- | |
| def get_style_definitions(): | |
| return { | |
| "styles": style_configs, | |
| "templates": style_templates | |
| } | |
| def generate_style_api(req: StylePrompt): | |
| if not API_KEYS: raise HTTPException(500, "API Keys Missing") | |
| for _ in range(3): | |
| try: | |
| genai.configure(api_key=random.choice(API_KEYS)) | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90).""" | |
| res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"}) | |
| data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) | |
| return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))} | |
| except: continue | |
| return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"} | |
| def upload(file: UploadFile = File(...)): | |
| if not API_KEYS: raise HTTPException(500, "API Keys Missing") | |
| fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1] | |
| raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3" | |
| try: | |
| with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f) | |
| subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| w, h, duration = get_video_info(fixed_path) | |
| subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| file_to_send = audio_path | |
| except Exception as e: raise HTTPException(500, f"File Processing Error: {e}") | |
| for _ in range(50): | |
| try: | |
| genai.configure(api_key=random.choice(API_KEYS)) | |
| vf = genai.upload_file(path=file_to_send) | |
| while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name) | |
| if vf.state.name == "FAILED": raise Exception("Gemini Failed") | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}" | |
| res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"}) | |
| data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) | |
| raw_segs = data.get("segments", []); final_segs = [] | |
| if not raw_segs: raise Exception("Empty transcript") | |
| seg_cnt = 0 | |
| for s in raw_segs: | |
| base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0)) | |
| if base_start >= duration: continue | |
| base_end = min(base_end, duration) | |
| if base_end <= base_start: base_end = base_start + 1.0 | |
| raw_words = s.get("text", "").split() | |
| if not raw_words: continue | |
| full_dur = base_end - base_start | |
| total_wc = len(raw_words) | |
| if total_wc == 0: continue | |
| for k in range(0, total_wc, 9): | |
| chunk = raw_words[k : k+9] | |
| if not chunk: continue | |
| c_start = round(base_start + (full_dur * (k / total_wc)), 3) | |
| c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3) | |
| c_words = [] | |
| chunk_dur = c_end - c_start | |
| for j, w in enumerate(chunk): | |
| w_s = round(c_start + (chunk_dur * j / len(chunk)), 3) | |
| w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3) | |
| c_words.append({ | |
| "word": w, | |
| "start": w_s, | |
| "end": w_e, | |
| "highlight": w in s.get("keywords", []) | |
| }) | |
| final_segs.append({ | |
| "id": seg_cnt, | |
| "start": c_start, | |
| "end": c_end, | |
| "text": " ".join(chunk), | |
| "words": c_words | |
| }) | |
| seg_cnt += 1 | |
| try: genai.delete_file(vf.name) | |
| except: pass | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| if os.path.exists(raw_path): os.remove(raw_path) | |
| return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")} | |
| except Exception as e: print(e); continue | |
| raise HTTPException(500, "Failed after 50 attempts") | |
| async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)): | |
| if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id") | |
| target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4") | |
| try: | |
| with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) | |
| except Exception as e: raise HTTPException(500, f"Could not save file: {e}") | |
| finally: await file.close() | |
| return {"status": "success", "message": f"File {file_id}.mp4 restored."} | |
| async def enqueue_render(req: ProcessRequest): | |
| if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")): | |
| return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"}) | |
| job_id = str(uuid.uuid4()) | |
| jobs_db[job_id] = Job(job_id, req) | |
| await render_queue.put(job_id) | |
| return {"job_id": job_id, "status": JobStatus.QUEUED} | |
| async def get_job_status(job_id: str): | |
| job = jobs_db.get(job_id) | |
| if not job: raise HTTPException(404, "Job not found") | |
| response = {"job_id": job.id, "status": job.status} | |
| if job.status == JobStatus.QUEUED: | |
| response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1 | |
| elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url | |
| elif job.status == JobStatus.FAILED: response["error"] = job.error_message | |
| return response |