import os import uuid import shutil import subprocess from pathlib import Path from typing import List, Optional, Dict, Any from fastapi import FastAPI, HTTPException from fastapi.responses import FileResponse from pydantic import BaseModel, Field from downloader import download_to from probe import ffprobe_json from planner import plan as make_plan, repair as repair_plan from builder import build_ffmpeg_cmd from safety import parse_ffmpeg_cmd, enforce_local_inputs, make_exec_args from fastapi.responses import RedirectResponse app = FastAPI(title="AI + FFmpeg Render Webhook (CPU Best)") BASE = Path("/tmp/ai_ffmpeg_jobs") MAX_RENDER_SECONDS = int(os.getenv("MAX_RENDER_SECONDS", "300")) MAX_REPAIR_TRIES = int(os.getenv("MAX_REPAIR_TRIES", "1")) class RenderRequest(BaseModel): file_urls: List[str] = Field(default_factory=list) prompt: str output_name: Optional[str] = "final.mp4" def safe_name(name: str, fallback: str = "final.mp4") -> str: name = (name or "").strip() or fallback name = name.replace("/", "_").replace("\\", "_").replace("..", "_") return name[:120] def make_job_dir() -> Path: job_dir = BASE / str(uuid.uuid4()) (job_dir / "input").mkdir(parents=True, exist_ok=True) (job_dir / "output").mkdir(parents=True, exist_ok=True) return job_dir def cleanup(job_dir: Path): if job_dir.exists(): shutil.rmtree(job_dir, ignore_errors=True) def run_ffmpeg(job_dir: Path, cmd_str: str, forced_output: Path) -> subprocess.CompletedProcess: parts = parse_ffmpeg_cmd(cmd_str) enforce_local_inputs(parts, job_dir) args = make_exec_args(parts, forced_output) return subprocess.run( args, cwd=str(job_dir), stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, timeout=MAX_RENDER_SECONDS ) @app.get("/") def root(): return RedirectResponse(url="/docs") @app.get("/health") def health(): return {"status": "ok"} @app.post("/render") def render(req: RenderRequest): job_dir = make_job_dir() input_dir = job_dir / "input" output_dir = job_dir / "output" output_name = safe_name(req.output_name or "final.mp4") forced_output = output_dir / output_name try: if not req.file_urls: raise HTTPException(400, {"status": "error", "message": "file_urls is required."}) # 1) Download all URLs local_paths = [] for i, url in enumerate(req.file_urls): p = download_to(url, input_dir, index=i) local_paths.append(p) local_files = [f"input/{p.name}" for p in local_paths] # 2) ffprobe metadata probes: Dict[str, Any] = {} for p in local_paths: probes[p.name] = ffprobe_json(p) # 3) Plan + build command plan = make_plan(local_files=local_files, probes=probes, user_prompt=req.prompt) cmd = build_ffmpeg_cmd(local_files=local_files, plan=plan) # 4) Run with repair loop tries = 0 last_cmd = cmd last_err = "" while True: proc = run_ffmpeg(job_dir, last_cmd, forced_output) if proc.returncode == 0 and forced_output.exists(): return FileResponse( path=str(forced_output), filename=output_name, media_type="application/octet-stream" ) last_err = (proc.stderr or "")[-2600:] if tries >= MAX_REPAIR_TRIES: break tries += 1 # Repair the plan using stderr and rebuild plan = repair_plan( local_files=local_files, probes=probes, user_prompt=req.prompt, last_cmd=last_cmd, stderr_tail=last_err ) last_cmd = build_ffmpeg_cmd(local_files=local_files, plan=plan) raise HTTPException( 400, { "status": "error", "message": "FFmpeg failed after retries", "last_command": last_cmd, "stderr_tail": last_err } ) except subprocess.TimeoutExpired: raise HTTPException(408, {"status": "error", "message": "Render timed out."}) except HTTPException: raise except Exception as e: raise HTTPException(400, {"status": "error", "message": str(e)}) finally: cleanup(job_dir)