babaTEEpe's picture
Update app.py
035f051 verified
"""
AI Story-to-Video Generator — FastAPI Backend
CPU-safe: orchestration only. Heavy models routed to external APIs.
"""
# Load .env FIRST before any other imports that read os.getenv()
from dotenv import load_dotenv
load_dotenv()
import asyncio
import os
import sys
# Windows-specific: ensure ProactorEventLoop is used for subprocess support
if sys.platform == 'win32':
asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy())
from contextlib import asynccontextmanager
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, Field
from typing import Optional
from utils.job_manager import JobManager
from utils.storage import ensure_dirs
from pipeline.script_engine import ScriptEngine
from pipeline.image_engine import ImageEngine
from pipeline.video_engine import VideoEngine
from pipeline.tts_engine import TTSEngine
from pipeline.merge_engine import MergeEngine
# ---------------------------------------------------------------------------
# Lifespan
# ---------------------------------------------------------------------------
async def cleanup_task():
"""Background task to delete old temp files and outputs every hour."""
import shutil
import time
while True:
try:
now = time.time()
# Clean /temp and /outputs older than 2 hours
for folder in ["temp", "outputs"]:
if os.path.exists(folder):
for item in os.listdir(folder):
path = os.path.join(folder, item)
if os.path.getmtime(path) < now - (2 * 3600):
if os.path.isdir(path):
shutil.rmtree(path)
else:
os.remove(path)
except Exception as e:
print(f"🧹 Cleanup error: {e}")
await asyncio.sleep(3600) # Sleep 1 hour
@asynccontextmanager
async def lifespan(app: FastAPI):
ensure_dirs()
asyncio.create_task(cleanup_task())
yield
app = FastAPI(
title="AI Story-to-Video Generator",
description="CPU-safe pipeline: orchestrates LLM, image, video, and TTS APIs.",
version="1.0.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Serve finished videos
os.makedirs("outputs", exist_ok=True)
app.mount("/outputs", StaticFiles(directory="outputs"), name="outputs")
# ---------------------------------------------------------------------------
# Shared state
# ---------------------------------------------------------------------------
job_manager = JobManager()
# ---------------------------------------------------------------------------
# Request / Response schemas
# ---------------------------------------------------------------------------
class GenerateRequest(BaseModel):
story: str = Field(..., min_length=10, max_length=4000, description="User story or prompt")
style: str = Field(default="cinematic", description="Visual style (cinematic, anime, watercolor, …)")
duration: int = Field(default=30, ge=10, le=60, description="Target video duration in seconds")
aspect_ratio: str = Field(default="16:9", description="Output aspect ratio: 16:9 or 9:16")
class GenerateResponse(BaseModel):
job_id: str
message: str
class StatusResponse(BaseModel):
job_id: str
status: str # queued | running | completed | failed
progress: int # 0-100
stage: str
video_url: Optional[str] = None
script: Optional[dict] = None
error: Optional[str] = None
# ---------------------------------------------------------------------------
# Background pipeline
# ---------------------------------------------------------------------------
async def run_pipeline(job_id: str, req: GenerateRequest):
"""Full async pipeline executed in the background."""
def update(progress: int, stage: str):
job_manager.update(job_id, progress=progress, stage=stage)
try:
job_manager.update(job_id, status="running", progress=5, stage="Generating screenplay…")
# STEP 1 — Script
script = await ScriptEngine().generate(
story=req.story,
style=req.style,
duration=req.duration,
)
job_manager.update(job_id, progress=15, stage="Screenplay ready", script=script)
scenes = script.get("scenes", [])
if not scenes:
raise ValueError("Script engine returned no scenes.")
# Clamp to 6-8 scenes
scenes = scenes[:8]
script["scenes"] = scenes
# STEP 2 — Character images
update(20, "Generating character references…")
image_engine = ImageEngine()
char_images = await image_engine.generate_characters(
characters=script.get("characters", []),
style=req.style,
)
# STEP 3 — Storyboard keyframes (parallel)
update(35, "Generating storyboard frames…")
frame_paths = await image_engine.generate_storyboard(
scenes=scenes,
char_images=char_images,
style=req.style,
job_id=job_id,
)
# STEP 4 — Video clips
update(55, "Animating scenes…")
video_engine = VideoEngine()
clip_paths = await video_engine.animate_scenes(
frame_paths=frame_paths,
scenes=scenes,
job_id=job_id,
)
# STEP 5 — TTS narration
update(75, "Generating narration audio…")
tts_engine = TTSEngine()
audio_path = await tts_engine.generate(
script=script,
job_id=job_id,
)
# STEP 6 — Merge
update(88, "Merging final video…")
merge_engine = MergeEngine()
output_path = await merge_engine.merge(
clip_paths=clip_paths,
audio_path=audio_path,
aspect_ratio=req.aspect_ratio,
job_id=job_id,
)
video_url = f"/outputs/{os.path.basename(output_path)}"
job_manager.update(
job_id,
status="completed",
progress=100,
stage="Done!",
video_url=video_url,
)
except Exception as exc:
import traceback
tb = traceback.format_exc()
error_msg = str(exc) or tb.split("\n")[-2] if tb else "Unknown error"
print(f"\n❌ PIPELINE ERROR (job {job_id}):\n{tb}")
job_manager.update(job_id, status="failed", stage="Pipeline error", error=error_msg)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.post("/generate", response_model=GenerateResponse, status_code=202)
async def generate(req: GenerateRequest, background_tasks: BackgroundTasks):
"""Start a video generation job. Returns job_id immediately."""
if job_manager.active_count() >= int(os.getenv("MAX_CONCURRENT_JOBS", "2")):
raise HTTPException(status_code=429, detail="Server busy. Try again shortly.")
job_id = job_manager.create()
background_tasks.add_task(run_pipeline, job_id, req)
return GenerateResponse(job_id=job_id, message="Job queued successfully.")
@app.get("/status/{job_id}", response_model=StatusResponse)
async def status(job_id: str):
"""Poll job status."""
job = job_manager.get(job_id)
if not job:
raise HTTPException(status_code=404, detail="Job not found.")
return StatusResponse(job_id=job_id, **job)
@app.get("/health")
async def health():
return {"status": "ok", "active_jobs": job_manager.active_count()}