from fastapi import FastAPI, File, UploadFile, HTTPException, Form from fastapi.responses import Response import tempfile import os import traceback import sys from pathlib import Path from typing import Optional, List from modules.compositor import generate_video from modules.lut import LUTS from modules.particles import EFFECTS from modules.titles import TITLE_STYLES, TITLE_ANIMATIONS, TITLE_POSITIONS, _ensure_fonts # 启动时预下载所有毛笔字体 _ensure_fonts() app = FastAPI(title="Video Generator with Effects API") @app.get("/") async def root(): return { "message": "Video Generator API", "effects": EFFECTS, "luts": {k: v["name"] for k, v in LUTS.items()}, "title_styles": TITLE_STYLES, "title_animations": TITLE_ANIMATIONS, "title_positions": TITLE_POSITIONS, "features": ["ken_burns", "typewriter", "cinemascope", "title", "video_input"], } @app.get("/health") async def health(): return {"status": "healthy"} @app.post("/generate") async def generate( images: List[UploadFile] = File(None), video: UploadFile = File(None), audio: UploadFile = File(...), srt_text: str = Form(""), bgm: Optional[UploadFile] = File(None), effect: str = Form("snow"), lut: str = Form("none"), ken_burns: bool = Form(False), typewriter: bool = Form(False), cinemascope: bool = Form(False), title_text: str = Form(""), title_style: str = Form("brush_muyao"), title_color: str = Form("white"), title_animation: str = Form("typewriter"), title_duration: float = Form(4.0), title_position: str = Form("center"), title_start: float = Form(0.0), title_font_size: int = Form(0), typewriter_font_size: int = Form(0), fps: int = Form(25), ): # 验证 if not images and not video: raise HTTPException(400, "必须提供 images 或 video 中的一个") if images and video: raise HTTPException(400, "不能同时提供 images 和 video") # 创建临时目录 tmp_dir = tempfile.mkdtemp() try: # 保存文件 image_paths = None video_path = None if images: image_paths = [] for i, img in enumerate(images): suffix = Path(img.filename).suffix if img.filename else ".jpg" path = os.path.join(tmp_dir, f"image_{i}{suffix}") with open(path, "wb") as f: f.write(await img.read()) image_paths.append(path) elif video: suffix = Path(video.filename).suffix if video.filename else ".mp4" video_path = os.path.join(tmp_dir, f"video{suffix}") with open(video_path, "wb") as f: f.write(await video.read()) audio_path = os.path.join(tmp_dir, "audio" + (Path(audio.filename).suffix or ".wav")) with open(audio_path, "wb") as f: f.write(await audio.read()) bgm_path = None if bgm: bgm_path = os.path.join(tmp_dir, "bgm" + (Path(bgm.filename).suffix or ".mp3")) with open(bgm_path, "wb") as f: f.write(await bgm.read()) output_path = os.path.join(tmp_dir, "output.mp4") # 调用主合成函数 generate_video( image_paths=image_paths, video_path=video_path, audio_path=audio_path, srt_text=srt_text, bgm_path=bgm_path, effect=effect, output_path=output_path, lut=lut, ken_burns=ken_burns, typewriter=typewriter, cinemascope=cinemascope, title_text=title_text, title_style=title_style, title_color=title_color, title_animation=title_animation, title_duration=title_duration, title_position=title_position, title_start=title_start, title_font_size=title_font_size, typewriter_font_size=typewriter_font_size, fps=fps, ) with open(output_path, "rb") as f: video_bytes = f.read() import shutil shutil.rmtree(tmp_dir, ignore_errors=True) return Response(content=video_bytes, media_type="video/mp4", headers={"Content-Disposition": "attachment; filename=output.mp4"}) except Exception as e: print(traceback.format_exc(), file=sys.stderr) import shutil shutil.rmtree(tmp_dir, ignore_errors=True) raise HTTPException(500, detail=str(e)) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7860)