Spaces:
Sleeping
Sleeping
| """ | |
| AI Story-to-Video Generator — FastAPI Backend | |
| CPU-safe: orchestration only. Heavy models routed to external APIs. | |
| """ | |
| # Load .env FIRST before any other imports that read os.getenv() | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| import asyncio | |
| import os | |
| import sys | |
| # Windows-specific: ensure ProactorEventLoop is used for subprocess support | |
| if sys.platform == 'win32': | |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI, BackgroundTasks, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel, Field | |
| from typing import Optional | |
| from utils.job_manager import JobManager | |
| from utils.storage import ensure_dirs | |
| from pipeline.script_engine import ScriptEngine | |
| from pipeline.image_engine import ImageEngine | |
| from pipeline.video_engine import VideoEngine | |
| from pipeline.tts_engine import TTSEngine | |
| from pipeline.merge_engine import MergeEngine | |
| # --------------------------------------------------------------------------- | |
| # Lifespan | |
| # --------------------------------------------------------------------------- | |
| async def cleanup_task(): | |
| """Background task to delete old temp files and outputs every hour.""" | |
| import shutil | |
| import time | |
| while True: | |
| try: | |
| now = time.time() | |
| # Clean /temp and /outputs older than 2 hours | |
| for folder in ["temp", "outputs"]: | |
| if os.path.exists(folder): | |
| for item in os.listdir(folder): | |
| path = os.path.join(folder, item) | |
| if os.path.getmtime(path) < now - (2 * 3600): | |
| if os.path.isdir(path): | |
| shutil.rmtree(path) | |
| else: | |
| os.remove(path) | |
| except Exception as e: | |
| print(f"🧹 Cleanup error: {e}") | |
| await asyncio.sleep(3600) # Sleep 1 hour | |
| async def lifespan(app: FastAPI): | |
| ensure_dirs() | |
| asyncio.create_task(cleanup_task()) | |
| yield | |
| app = FastAPI( | |
| title="AI Story-to-Video Generator", | |
| description="CPU-safe pipeline: orchestrates LLM, image, video, and TTS APIs.", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| ) | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Serve finished videos | |
| os.makedirs("outputs", exist_ok=True) | |
| app.mount("/outputs", StaticFiles(directory="outputs"), name="outputs") | |
| # --------------------------------------------------------------------------- | |
| # Shared state | |
| # --------------------------------------------------------------------------- | |
| job_manager = JobManager() | |
| # --------------------------------------------------------------------------- | |
| # Request / Response schemas | |
| # --------------------------------------------------------------------------- | |
| class GenerateRequest(BaseModel): | |
| story: str = Field(..., min_length=10, max_length=4000, description="User story or prompt") | |
| style: str = Field(default="cinematic", description="Visual style (cinematic, anime, watercolor, …)") | |
| duration: int = Field(default=30, ge=10, le=60, description="Target video duration in seconds") | |
| aspect_ratio: str = Field(default="16:9", description="Output aspect ratio: 16:9 or 9:16") | |
| class GenerateResponse(BaseModel): | |
| job_id: str | |
| message: str | |
| class StatusResponse(BaseModel): | |
| job_id: str | |
| status: str # queued | running | completed | failed | |
| progress: int # 0-100 | |
| stage: str | |
| video_url: Optional[str] = None | |
| script: Optional[dict] = None | |
| error: Optional[str] = None | |
| # --------------------------------------------------------------------------- | |
| # Background pipeline | |
| # --------------------------------------------------------------------------- | |
| async def run_pipeline(job_id: str, req: GenerateRequest): | |
| """Full async pipeline executed in the background.""" | |
| def update(progress: int, stage: str): | |
| job_manager.update(job_id, progress=progress, stage=stage) | |
| try: | |
| job_manager.update(job_id, status="running", progress=5, stage="Generating screenplay…") | |
| # STEP 1 — Script | |
| script = await ScriptEngine().generate( | |
| story=req.story, | |
| style=req.style, | |
| duration=req.duration, | |
| ) | |
| job_manager.update(job_id, progress=15, stage="Screenplay ready", script=script) | |
| scenes = script.get("scenes", []) | |
| if not scenes: | |
| raise ValueError("Script engine returned no scenes.") | |
| # Clamp to 6-8 scenes | |
| scenes = scenes[:8] | |
| script["scenes"] = scenes | |
| # STEP 2 — Character images | |
| update(20, "Generating character references…") | |
| image_engine = ImageEngine() | |
| char_images = await image_engine.generate_characters( | |
| characters=script.get("characters", []), | |
| style=req.style, | |
| ) | |
| # STEP 3 — Storyboard keyframes (parallel) | |
| update(35, "Generating storyboard frames…") | |
| frame_paths = await image_engine.generate_storyboard( | |
| scenes=scenes, | |
| char_images=char_images, | |
| style=req.style, | |
| job_id=job_id, | |
| ) | |
| # STEP 4 — Video clips | |
| update(55, "Animating scenes…") | |
| video_engine = VideoEngine() | |
| clip_paths = await video_engine.animate_scenes( | |
| frame_paths=frame_paths, | |
| scenes=scenes, | |
| job_id=job_id, | |
| ) | |
| # STEP 5 — TTS narration | |
| update(75, "Generating narration audio…") | |
| tts_engine = TTSEngine() | |
| audio_path = await tts_engine.generate( | |
| script=script, | |
| job_id=job_id, | |
| ) | |
| # STEP 6 — Merge | |
| update(88, "Merging final video…") | |
| merge_engine = MergeEngine() | |
| output_path = await merge_engine.merge( | |
| clip_paths=clip_paths, | |
| audio_path=audio_path, | |
| aspect_ratio=req.aspect_ratio, | |
| job_id=job_id, | |
| ) | |
| video_url = f"/outputs/{os.path.basename(output_path)}" | |
| job_manager.update( | |
| job_id, | |
| status="completed", | |
| progress=100, | |
| stage="Done!", | |
| video_url=video_url, | |
| ) | |
| except Exception as exc: | |
| import traceback | |
| tb = traceback.format_exc() | |
| error_msg = str(exc) or tb.split("\n")[-2] if tb else "Unknown error" | |
| print(f"\n❌ PIPELINE ERROR (job {job_id}):\n{tb}") | |
| job_manager.update(job_id, status="failed", stage="Pipeline error", error=error_msg) | |
| # --------------------------------------------------------------------------- | |
| # Routes | |
| # --------------------------------------------------------------------------- | |
| async def generate(req: GenerateRequest, background_tasks: BackgroundTasks): | |
| """Start a video generation job. Returns job_id immediately.""" | |
| if job_manager.active_count() >= int(os.getenv("MAX_CONCURRENT_JOBS", "2")): | |
| raise HTTPException(status_code=429, detail="Server busy. Try again shortly.") | |
| job_id = job_manager.create() | |
| background_tasks.add_task(run_pipeline, job_id, req) | |
| return GenerateResponse(job_id=job_id, message="Job queued successfully.") | |
| async def status(job_id: str): | |
| """Poll job status.""" | |
| job = job_manager.get(job_id) | |
| if not job: | |
| raise HTTPException(status_code=404, detail="Job not found.") | |
| return StatusResponse(job_id=job_id, **job) | |
| async def health(): | |
| return {"status": "ok", "active_jobs": job_manager.active_count()} | |