Spaces:
Sleeping
Sleeping
| import os | |
| import shutil | |
| import subprocess | |
| import uuid | |
| import json | |
| import time | |
| import asyncio | |
| import random | |
| from datetime import datetime | |
| from typing import List, Optional, Union, Dict | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import google.generativeai as genai | |
| from pydantic import BaseModel | |
| from PIL import Image, ImageDraw, ImageFont | |
| app = FastAPI() | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| TEMP_DIR = "temp" | |
| STATIC_DIR = "static" | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| os.makedirs(STATIC_DIR, exist_ok=True) | |
| app.mount("/temp", StaticFiles(directory="temp"), name="temp") | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| MODEL_NAME = "gemini-3-flash-preview" | |
| FONT_DIR = "font" | |
| FONT_FILES_MAP = { | |
| "vazir": "Vazirmatn.ttf", "lalezar": "Lalezar.ttf", | |
| "bangers": "Bangers.ttf", "roboto": "Roboto.ttf" | |
| } | |
| raw_keys = os.getenv("ALL_GEMINI_API_KEYS", "") | |
| API_KEYS = [k.strip() for k in raw_keys.split(",") if k.strip()] | |
| if not API_KEYS: | |
| single_key = os.getenv("GEMINI_API_KEY") | |
| if single_key: API_KEYS.append(single_key) | |
| print(f"--- تعداد {len(API_KEYS)} کلید جیمینای شناسایی شد ---") | |
| class WordInfo(BaseModel): | |
| word: str; start: float; end: float | |
| highlight: Optional[bool] = False | |
| color: Optional[str] = None | |
| class SubtitleSegment(BaseModel): | |
| id: Union[str, int]; start: float; end: float; text: str | |
| words: Optional[List[WordInfo]] = [] | |
| class StyleConfig(BaseModel): | |
| font: str; fontSize: int; primaryColor: str; outlineColor: str | |
| backType: str; marginV: int | |
| x: Optional[int] = 0 | |
| name: Optional[str] = "classic" | |
| radius: Optional[int] = 16 | |
| paddingX: Optional[int] = 20 | |
| paddingY: Optional[int] = 10 | |
| class ProcessRequest(BaseModel): | |
| file_id: str; segments: List[SubtitleSegment] | |
| video_width: int; video_height: int; style: StyleConfig | |
| class StylePrompt(BaseModel): | |
| description: str | |
| class JobStatus: | |
| QUEUED = "queued"; PROCESSING = "processing" | |
| COMPLETED = "completed"; FAILED = "failed" | |
| class Job: | |
| def __init__(self, job_id: str, request_data: ProcessRequest): | |
| self.id = job_id; self.data = request_data; self.status = JobStatus.QUEUED | |
| self.created_at = datetime.now(); self.result_url = None; self.error_message = None | |
| render_queue = asyncio.Queue() | |
| jobs_db: Dict[str, Job] = {} | |
| async def queue_worker(): | |
| print("--- Queue Worker Started ---") | |
| while True: | |
| job_id = await render_queue.get() | |
| job = jobs_db.get(job_id) | |
| if job: | |
| try: | |
| print(f"Processing job: {job_id}") | |
| job.status = JobStatus.PROCESSING | |
| output_url = process_render_logic(job.data) | |
| job.result_url = output_url | |
| job.status = JobStatus.COMPLETED | |
| print(f"Job {job_id} completed.") | |
| except Exception as e: | |
| print(f"Job {job_id} failed: {e}") | |
| job.status = JobStatus.FAILED | |
| job.error_message = str(e) | |
| render_queue.task_done() | |
| async def startup_event(): asyncio.create_task(queue_worker()) | |
| def get_video_info(path): | |
| try: | |
| cmd = ["ffprobe", "-v", "error", "-select_streams", "v:0", "-show_entries", "stream=width,height,duration", "-of", "json", path] | |
| res = subprocess.run(cmd, capture_output=True, text=True) | |
| data = json.loads(res.stdout) | |
| stream = data['streams'][0] | |
| w = int(stream.get('width', 1080)); h = int(stream.get('height', 1920)); dur = stream.get('duration') | |
| if not dur: | |
| cmd_dur = ["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "json", path] | |
| res_dur = subprocess.run(cmd_dur, capture_output=True, text=True) | |
| data_dur = json.loads(res_dur.stdout) | |
| dur = data_dur['format'].get('duration', 60) | |
| return w, h, float(dur) | |
| except: return 1080, 1920, 60.0 | |
| def get_font_object(style_font_name, size): | |
| target_filename = FONT_FILES_MAP.get(style_font_name, "Vazirmatn.ttf") | |
| target_path = os.path.join(FONT_DIR, target_filename) | |
| if not os.path.exists(target_path): target_path = os.path.join(FONT_DIR, "Vazirmatn.ttf") | |
| if os.path.exists(target_path): return ImageFont.truetype(target_path, size) | |
| return ImageFont.load_default() | |
| def get_color_tuple(color_str: str, default=(255, 255, 255, 255)): | |
| if not color_str or not isinstance(color_str, str): return default | |
| color_str = color_str.strip().lower() | |
| if color_str.startswith('#'): | |
| try: | |
| hex_val = color_str.lstrip('#') | |
| if len(hex_val) == 6: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4)) + (255,) | |
| elif len(hex_val) == 8: return tuple(int(hex_val[i:i+2], 16) for i in (0, 2, 4, 6)) | |
| except: pass | |
| elif color_str.startswith('rgba'): | |
| try: | |
| content = color_str[color_str.find('(')+1 : color_str.rfind(')')] | |
| parts = [x.strip() for x in content.split(',')] | |
| if len(parts) >= 4: | |
| r, g, b = int(parts[0]), int(parts[1]), int(parts[2]) | |
| a = int(float(parts[3]) * 255) | |
| return (r, g, b, a) | |
| except: pass | |
| elif color_str.startswith('rgb'): | |
| try: | |
| content = color_str[color_str.find('(')+1 : color_str.rfind(')')] | |
| parts = [x.strip() for x in content.split(',')] | |
| if len(parts) >= 3: return (int(parts[0]), int(parts[1]), int(parts[2]), 255) | |
| except: pass | |
| return default | |
| def create_subtitle_image(text_parts: list, active_idx: int, width: int, height: int, style: StyleConfig, word_infos: Optional[List[WordInfo]] = None): | |
| img = Image.new('RGBA', (width, height), (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(img) | |
| font = get_font_object(style.font, style.fontSize) | |
| is_karaoke_style = style.name in ["karaoke_static", "auto_director", "karaoke_purple"] | |
| MAX_WORDS_PER_LINE = 5 | |
| # 1. گروهبندی کلمات در خطوط ۵ تایی | |
| lines = [] | |
| current_line = [] | |
| line_word_indices = [] | |
| current_line_indices = [] | |
| for i, word in enumerate(text_parts): | |
| current_line.append(word) | |
| current_line_indices.append(i) | |
| if len(current_line) == MAX_WORDS_PER_LINE: | |
| lines.append(current_line) | |
| line_word_indices.append(current_line_indices) | |
| current_line = [] | |
| current_line_indices = [] | |
| if current_line: | |
| lines.append(current_line) | |
| line_word_indices.append(current_line_indices) | |
| space_w = draw.textlength(" ", font=font) | |
| line_metrics = [] | |
| max_line_width = 0 | |
| for line_words in lines: | |
| w_widths = [] | |
| l_width = 0 | |
| full_line_text = " ".join(line_words) | |
| try: | |
| l_width = draw.textlength(full_line_text, font=font, direction='rtl', language='fa') | |
| except: | |
| l_width = font.getlength(full_line_text) | |
| if l_width > max_line_width: | |
| max_line_width = l_width | |
| for w in line_words: | |
| try: wl = draw.textlength(w, font=font, direction='rtl', language='fa') | |
| except: wl = font.getlength(w) | |
| w_widths.append(wl) | |
| line_metrics.append({ | |
| "width": l_width, | |
| "words": line_words, | |
| "word_widths": w_widths | |
| }) | |
| line_height_px = int(style.fontSize * 1.5) | |
| total_block_height = len(lines) * line_height_px | |
| bottom_reference = height - style.marginV | |
| start_y_of_block = bottom_reference - total_block_height | |
| if not is_karaoke_style and style.name not in ["plain_white", "white_outline"] and style.backType in ['solid', 'transparent']: | |
| ratio = height / width | |
| box_center_y_adjustment = 0 | |
| if ratio > 1.6: box_center_y_adjustment = 5 | |
| elif ratio > 1.1: box_center_y_adjustment = 10 | |
| else: box_center_y_adjustment = 15 | |
| box_center_x = width / 2 + style.x | |
| box_width = max_line_width + (style.paddingX * 2) | |
| box_x1 = box_center_x - (box_width / 2) | |
| box_x2 = box_center_x + (box_width / 2) | |
| visual_top_correction = int(style.fontSize * 0.12) | |
| box_y1 = start_y_of_block - style.paddingY + box_center_y_adjustment + visual_top_correction | |
| box_y2 = start_y_of_block + total_block_height + style.paddingY + box_center_y_adjustment - (line_height_px * 0.3) | |
| fill_color_tuple = get_color_tuple(style.outlineColor, (0, 0, 0, 255)) | |
| if style.backType == 'transparent' and fill_color_tuple[3] == 255: | |
| fill_color_tuple = (fill_color_tuple[0], fill_color_tuple[1], fill_color_tuple[2], 160) | |
| draw.rounded_rectangle([box_x1, box_y1, box_x2, box_y2], radius=style.radius, fill=fill_color_tuple) | |
| current_line_y = start_y_of_block | |
| for line_idx, metrics in enumerate(line_metrics): | |
| start_x = (width + metrics["width"]) / 2 + style.x | |
| cursor_x = start_x | |
| text_y_pos = current_line_y + (line_height_px * 0.005) | |
| words_to_draw = [] | |
| global_indices = line_word_indices[line_idx] | |
| for w_i, word in enumerate(metrics["words"]): | |
| w_len = metrics["word_widths"][w_i] | |
| word_x = cursor_x - w_len | |
| global_idx = global_indices[w_i] | |
| is_active = (global_idx == active_idx) | |
| words_to_draw.append({ | |
| "text": word, | |
| "x": word_x, | |
| "y": text_y_pos, | |
| "width": w_len, | |
| "is_active": is_active, | |
| "global_idx": global_idx | |
| }) | |
| cursor_x -= (w_len + space_w) | |
| VERTICAL_CORRECTION = int(style.fontSize * 0.22) | |
| for item in words_to_draw: | |
| if item["is_active"] and is_karaoke_style: | |
| pad_x, pad_y = style.paddingX, style.paddingY | |
| box_color = (160, 32, 240, 255) | |
| clean_primary = get_color_tuple(style.primaryColor, (160, 32, 240, 255)) | |
| if style.name == "auto_director": | |
| box_color = (0, 215, 255, 255) if item["global_idx"] % 2 == 0 else (255, 0, 128, 255) | |
| elif style.name == "karaoke_static": | |
| box_color = clean_primary | |
| # --- تغییر: حذف کد تغییر رنگ کادر (باکس) بر اساس رنگ اختصاصی --- | |
| # قبلاً اینجا یک بلوک try/except برای override رنگ باکس بود که حذف شد. | |
| rect_y1 = item["y"] - int(pad_y * 0.7) + VERTICAL_CORRECTION | |
| rect_y2 = item["y"] + style.fontSize + int(pad_y * 0.7) + VERTICAL_CORRECTION | |
| draw.rounded_rectangle( | |
| [item["x"] - pad_x, rect_y1, item["x"] + item["width"] + pad_x, rect_y2], | |
| radius=style.radius, fill=box_color | |
| ) | |
| for i, item in enumerate(words_to_draw): | |
| if style.name == "progressive_write" and active_idx != -1 and item["global_idx"] > active_idx: | |
| continue | |
| text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), 0 | |
| if style.name == "plain_white": text_color = (255,255,255,255) | |
| elif style.name == "white_outline": | |
| text_color, stroke_color, stroke_width = (255,255,255,255), (0,0,0,255), max(2, int(style.fontSize / 12)) | |
| elif not is_karaoke_style: | |
| text_color = get_color_tuple(style.primaryColor, (255,255,255,255)) | |
| stroke_color = get_color_tuple(style.outlineColor, (0,0,0,255)) | |
| stroke_width = max(2, int(style.fontSize / 12)) if style.backType == 'outline' else 0 | |
| # --- تغییر: تغییر رنگ متن (Text Color) حتی در حالت کارائوکه --- | |
| try: | |
| if word_infos and item["global_idx"] < len(word_infos): | |
| w_obj = word_infos[item["global_idx"]] | |
| if hasattr(w_obj, 'color') and w_obj.color: | |
| # شرط if not is_karaoke_style حذف شد | |
| text_color = get_color_tuple(w_obj.color, text_color) | |
| except: pass | |
| # جابجا کردن متن به سمت بالا برای هماهنگی با کادر (کاهش Y) | |
| draw.text((item["x"], item["y"] - int(style.fontSize * 0.05)), item["text"], font=font, fill=text_color, stroke_width=stroke_width, stroke_fill=stroke_color, direction='rtl', language='fa') | |
| current_line_y += line_height_px | |
| return img | |
| def generate_subtitle_video(data: ProcessRequest, temp_dir: str): | |
| list_file = os.path.join(temp_dir, f"{data.file_id}_list.txt") | |
| empty_img_path = os.path.join(temp_dir, "empty.png") | |
| if not os.path.exists(empty_img_path): Image.new('RGBA', (data.video_width, data.video_height), (0, 0, 0, 0)).save(empty_img_path) | |
| with open(list_file, "w") as f: | |
| is_dynamic = data.style.name in ["karaoke_static", "auto_director", "karaoke_purple", "progressive_write"] | |
| current_timeline = 0.0 | |
| sorted_segments = sorted(data.segments, key=lambda x: x.start) | |
| for idx, seg in enumerate(sorted_segments): | |
| start_time = round(max(seg.start, current_timeline), 3) | |
| end_time = round(max(seg.end, start_time + 0.1), 3) | |
| if end_time - start_time < 0.04: continue | |
| gap = round(start_time - current_timeline, 3) | |
| if gap > 0.005: | |
| f.write(f"file 'empty.png'\nduration {gap:.3f}\n") | |
| current_timeline += gap | |
| current_timeline = start_time | |
| available_duration = round(end_time - current_timeline, 3) | |
| words = [w.word for w in seg.words] if seg.words else seg.text.split() | |
| if seg.words and is_dynamic and len(words) > 0: | |
| seg.words.sort(key=lambda x: x.start) | |
| words = [w.word for w in seg.words] # اصلاح: بروزرسانی لیست کلمات پس از مرتبسازی | |
| word_files, total_word_raw_duration = [], 0 | |
| for i, w_info in enumerate(seg.words): | |
| name = f"sub_{data.file_id}_{idx}_{i}.png" | |
| img = create_subtitle_image(words, i, data.video_width, data.video_height, data.style, seg.words) | |
| img.save(os.path.join(temp_dir, name)) | |
| raw_dur = max(0.04, w_info.end - w_info.start) | |
| word_files.append({"file": name, "dur": raw_dur}) | |
| total_word_raw_duration += raw_dur | |
| scale_factor = available_duration / total_word_raw_duration if total_word_raw_duration > 0 else 1 | |
| accumulated_written = 0.0 | |
| for wf in word_files: | |
| final_dur = max(0.01, round(wf["dur"] * scale_factor, 3)) | |
| f.write(f"file '{wf['file']}'\nduration {final_dur:.3f}\n") | |
| accumulated_written += final_dur | |
| current_timeline += accumulated_written | |
| else: | |
| name = f"sub_{data.file_id}_{idx}_full.png" | |
| img = create_subtitle_image(words, -1, data.video_width, data.video_height, data.style, seg.words) | |
| img.save(os.path.join(temp_dir, name)) | |
| f.write(f"file '{name}'\nduration {available_duration:.3f}\n") | |
| current_timeline += available_duration | |
| remaining_in_segment = round(end_time - current_timeline, 3) | |
| if remaining_in_segment > 0.005: | |
| last_used = f"sub_{data.file_id}_{idx}_{len(words)-1}.png" if (seg.words and is_dynamic and len(words)>0) else f"sub_{data.file_id}_{idx}_full.png" | |
| f.write(f"file '{last_used}'\nduration {remaining_in_segment:.3f}\n") | |
| current_timeline += remaining_in_segment | |
| f.write(f"file 'empty.png'\nduration 30.0\n") | |
| return list_file | |
| def process_render_logic(req: ProcessRequest) -> str: | |
| req.segments = [s for s in req.segments if s.end > s.start] | |
| req.segments.sort(key=lambda x: x.start) | |
| lst = generate_subtitle_video(req, TEMP_DIR) | |
| inp = f"{TEMP_DIR}/{req.file_id}.mp4" | |
| if not os.path.exists(inp): raise Exception("Input video not found") | |
| sub_video_path = f"{TEMP_DIR}/{req.file_id}_sub_render.mov" | |
| out = f"{TEMP_DIR}/{req.file_id}_final_{int(time.time())}.mp4" | |
| cmd_step1 = ["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", lst, "-r", "30", "-s", f"{req.video_width}x{req.video_height}", "-c:v", "png", "-pix_fmt", "rgba", sub_video_path] | |
| res1 = subprocess.run(cmd_step1, capture_output=True, text=True) | |
| if res1.returncode != 0: raise Exception(f"Subtitle generation failed: {res1.stderr}") | |
| cmd_step2 = ["ffmpeg", "-y", "-i", inp, "-i", sub_video_path, "-filter_complex", "[0:v][1:v]overlay=0:0:eof_action=pass[outv]", "-map", "[outv]", "-map", "0:a", "-c:v", "libx264", "-r", "30", "-preset", "ultrafast", "-c:a", "aac", out] | |
| res2 = subprocess.run(cmd_step2, capture_output=True, text=True) | |
| if res2.returncode != 0: raise Exception(f"Merge failed: {res2.stderr}") | |
| if os.path.exists(sub_video_path): os.remove(sub_video_path) | |
| return f"/temp/{os.path.basename(out)}" | |
| async def index(): return FileResponse("index.html") | |
| def generate_style_api(req: StylePrompt): | |
| if not API_KEYS: raise HTTPException(500, "API Keys Missing") | |
| for _ in range(3): | |
| try: | |
| genai.configure(api_key=random.choice(API_KEYS)) | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| prompt = f"""You are a JSON generator. Create a subtitle style based on: "{req.description}". Return JSON only. Keys: primaryColor, outlineColor, backType (solid/transparent/outline), font (vazir/lalezar/bangers/roboto), fontSize (30-90).""" | |
| res = model.generate_content(prompt, generation_config={"response_mime_type": "application/json"}) | |
| data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) | |
| return {"primaryColor": data.get("primaryColor", "#FFFFFF"), "outlineColor": data.get("outlineColor", "#000000"), "backType": data.get("backType", "solid"), "font": data.get("font", "vazir"), "fontSize": int(data.get("fontSize", 60))} | |
| except: continue | |
| return {"primaryColor":"#FFFFFF", "outlineColor":"#000000", "font":"vazir", "fontSize":60, "backType":"solid"} | |
| def upload(file: UploadFile = File(...)): | |
| if not API_KEYS: raise HTTPException(500, "API Keys Missing") | |
| fid = str(uuid.uuid4())[:8]; ext = file.filename.split('.')[-1] | |
| raw_path, fixed_path, audio_path = f"{TEMP_DIR}/{fid}_raw.{ext}", f"{TEMP_DIR}/{fid}.mp4", f"{TEMP_DIR}/{fid}.mp3" | |
| try: | |
| with open(raw_path, "wb") as f: shutil.copyfileobj(file.file, f) | |
| subprocess.run(["ffmpeg", "-y", "-i", raw_path, "-r", "30", "-c:v", "libx264", "-preset", "ultrafast", "-c:a", "copy", fixed_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| w, h, duration = get_video_info(fixed_path) | |
| subprocess.run(["ffmpeg", "-y", "-i", fixed_path, "-vn", "-acodec", "libmp3lame", "-q:a", "4", audio_path], check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) | |
| file_to_send = audio_path | |
| except Exception as e: raise HTTPException(500, f"File Processing Error: {e}") | |
| for _ in range(50): | |
| try: | |
| genai.configure(api_key=random.choice(API_KEYS)) | |
| vf = genai.upload_file(path=file_to_send) | |
| while vf.state.name == "PROCESSING": time.sleep(2); vf = genai.get_file(vf.name) | |
| if vf.state.name == "FAILED": raise Exception("Gemini Failed") | |
| model = genai.GenerativeModel(MODEL_NAME) | |
| prompt = f"The audio is {duration:.2f}s. Transcribe Persian speech to JSON. Timestamps MUST NOT exceed {duration:.2f}s. JSON: {{segments: [{{start, end, text, keywords}}], style_suggestion: {{...}}}}" | |
| res = model.generate_content([vf, prompt], generation_config={"response_mime_type": "application/json"}) | |
| # --- شروع بخش اصلاح شده --- | |
| data = json.loads(res.text.replace('```json', '').replace('```', '').strip()) | |
| raw_segs = data.get("segments", []); final_segs = [] | |
| if not raw_segs: raise Exception("Empty transcript") | |
| seg_cnt = 0 | |
| for s in raw_segs: | |
| base_start, base_end = float(s.get("start", 0)), float(s.get("end", 0)) | |
| if base_start >= duration: continue | |
| base_end = min(base_end, duration) | |
| if base_end <= base_start: base_end = base_start + 1.0 | |
| raw_words = s.get("text", "").split() | |
| if not raw_words: continue | |
| full_dur = base_end - base_start | |
| total_wc = len(raw_words) | |
| if total_wc == 0: continue | |
| # تقسیمبندی کلمات به دستههای ۱۰ تایی | |
| for k in range(0, total_wc, 9): | |
| chunk = raw_words[k : k+9] | |
| if not chunk: continue | |
| c_start = round(base_start + (full_dur * (k / total_wc)), 3) | |
| c_end = round(base_start + (full_dur * ((k + len(chunk)) / total_wc)), 3) | |
| c_words = [] | |
| chunk_dur = c_end - c_start | |
| for j, w in enumerate(chunk): | |
| w_s = round(c_start + (chunk_dur * j / len(chunk)), 3) | |
| w_e = round(c_start + (chunk_dur * (j + 1) / len(chunk)), 3) | |
| c_words.append({ | |
| "word": w, | |
| "start": w_s, | |
| "end": w_e, | |
| "highlight": w in s.get("keywords", []) | |
| }) | |
| final_segs.append({ | |
| "id": seg_cnt, | |
| "start": c_start, | |
| "end": c_end, | |
| "text": " ".join(chunk), | |
| "words": c_words | |
| }) | |
| seg_cnt += 1 | |
| # --- پایان بخش اصلاح شده --- | |
| try: genai.delete_file(vf.name) | |
| except: pass | |
| if os.path.exists(audio_path): os.remove(audio_path) | |
| if os.path.exists(raw_path): os.remove(raw_path) | |
| return {"file_id": fid, "url": f"/temp/{fid}.mp4", "width": w, "height": h, "segments": final_segs, "suggested_style": data.get("style_suggestion")} | |
| except Exception as e: print(e); continue | |
| raise HTTPException(500, "Failed after 50 attempts") | |
| async def reupload_video(file: UploadFile = File(...), file_id: str = Form(...)): | |
| if not file_id or '/' in file_id or '\\' in file_id: raise HTTPException(400, "Invalid file_id") | |
| target_path = os.path.join(TEMP_DIR, f"{file_id}.mp4") | |
| try: | |
| with open(target_path, "wb") as buffer: shutil.copyfileobj(file.file, buffer) | |
| except Exception as e: raise HTTPException(500, f"Could not save file: {e}") | |
| finally: await file.close() | |
| return {"status": "success", "message": f"File {file_id}.mp4 restored."} | |
| async def enqueue_render(req: ProcessRequest): | |
| if not os.path.exists(os.path.join(TEMP_DIR, f"{req.file_id}.mp4")): | |
| return JSONResponse(status_code=200, content={"error": "Video not found", "error_code": "VIDEO_NOT_FOUND"}) | |
| job_id = str(uuid.uuid4()) | |
| jobs_db[job_id] = Job(job_id, req) | |
| await render_queue.put(job_id) | |
| return {"job_id": job_id, "status": JobStatus.QUEUED} | |
| async def get_job_status(job_id: str): | |
| job = jobs_db.get(job_id) | |
| if not job: raise HTTPException(404, "Job not found") | |
| response = {"job_id": job.id, "status": job.status} | |
| if job.status == JobStatus.QUEUED: | |
| response["queue_position"] = sum(1 for j in jobs_db.values() if j.status == JobStatus.QUEUED and j.created_at < job.created_at) + 1 | |
| elif job.status == JobStatus.COMPLETED: response["url"] = job.result_url | |
| elif job.status == JobStatus.FAILED: response["error"] = job.error_message | |
| return response |