MarneMorgan's picture
Update app.py
d00424a verified
import os
import uuid
import shutil
import subprocess
from pathlib import Path
from typing import List, Optional, Dict, Any
from fastapi import FastAPI, HTTPException
from fastapi.responses import FileResponse
from pydantic import BaseModel, Field
from downloader import download_to
from probe import ffprobe_json
from planner import plan as make_plan, repair as repair_plan
from builder import build_ffmpeg_cmd
from safety import parse_ffmpeg_cmd, enforce_local_inputs, make_exec_args
from fastapi.responses import RedirectResponse
app = FastAPI(title="AI + FFmpeg Render Webhook (CPU Best)")
BASE = Path("/tmp/ai_ffmpeg_jobs")
MAX_RENDER_SECONDS = int(os.getenv("MAX_RENDER_SECONDS", "300"))
MAX_REPAIR_TRIES = int(os.getenv("MAX_REPAIR_TRIES", "1"))
class RenderRequest(BaseModel):
file_urls: List[str] = Field(default_factory=list)
prompt: str
output_name: Optional[str] = "final.mp4"
def safe_name(name: str, fallback: str = "final.mp4") -> str:
name = (name or "").strip() or fallback
name = name.replace("/", "_").replace("\\", "_").replace("..", "_")
return name[:120]
def make_job_dir() -> Path:
job_dir = BASE / str(uuid.uuid4())
(job_dir / "input").mkdir(parents=True, exist_ok=True)
(job_dir / "output").mkdir(parents=True, exist_ok=True)
return job_dir
def cleanup(job_dir: Path):
if job_dir.exists():
shutil.rmtree(job_dir, ignore_errors=True)
def run_ffmpeg(job_dir: Path, cmd_str: str, forced_output: Path) -> subprocess.CompletedProcess:
parts = parse_ffmpeg_cmd(cmd_str)
enforce_local_inputs(parts, job_dir)
args = make_exec_args(parts, forced_output)
return subprocess.run(
args,
cwd=str(job_dir),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
timeout=MAX_RENDER_SECONDS
)
@app.get("/")
def root():
return RedirectResponse(url="/docs")
@app.get("/health")
def health():
return {"status": "ok"}
@app.post("/render")
def render(req: RenderRequest):
job_dir = make_job_dir()
input_dir = job_dir / "input"
output_dir = job_dir / "output"
output_name = safe_name(req.output_name or "final.mp4")
forced_output = output_dir / output_name
try:
if not req.file_urls:
raise HTTPException(400, {"status": "error", "message": "file_urls is required."})
# 1) Download all URLs
local_paths = []
for i, url in enumerate(req.file_urls):
p = download_to(url, input_dir, index=i)
local_paths.append(p)
local_files = [f"input/{p.name}" for p in local_paths]
# 2) ffprobe metadata
probes: Dict[str, Any] = {}
for p in local_paths:
probes[p.name] = ffprobe_json(p)
# 3) Plan + build command
plan = make_plan(local_files=local_files, probes=probes, user_prompt=req.prompt)
cmd = build_ffmpeg_cmd(local_files=local_files, plan=plan)
# 4) Run with repair loop
tries = 0
last_cmd = cmd
last_err = ""
while True:
proc = run_ffmpeg(job_dir, last_cmd, forced_output)
if proc.returncode == 0 and forced_output.exists():
return FileResponse(
path=str(forced_output),
filename=output_name,
media_type="application/octet-stream"
)
last_err = (proc.stderr or "")[-2600:]
if tries >= MAX_REPAIR_TRIES:
break
tries += 1
# Repair the plan using stderr and rebuild
plan = repair_plan(
local_files=local_files,
probes=probes,
user_prompt=req.prompt,
last_cmd=last_cmd,
stderr_tail=last_err
)
last_cmd = build_ffmpeg_cmd(local_files=local_files, plan=plan)
raise HTTPException(
400,
{
"status": "error",
"message": "FFmpeg failed after retries",
"last_command": last_cmd,
"stderr_tail": last_err
}
)
except subprocess.TimeoutExpired:
raise HTTPException(408, {"status": "error", "message": "Render timed out."})
except HTTPException:
raise
except Exception as e:
raise HTTPException(400, {"status": "error", "message": str(e)})
finally:
cleanup(job_dir)