Spaces:
Running
Running
| from fastapi import FastAPI, File, UploadFile, HTTPException, Form | |
| from fastapi.responses import Response | |
| import tempfile | |
| import os | |
| import traceback | |
| import sys | |
| from pathlib import Path | |
| from typing import Optional, List | |
| from modules.compositor import generate_video | |
| from modules.lut import LUTS | |
| from modules.particles import EFFECTS | |
| from modules.titles import TITLE_STYLES, TITLE_ANIMATIONS, TITLE_POSITIONS, _ensure_fonts | |
| # 启动时预下载所有毛笔字体 | |
| _ensure_fonts() | |
| app = FastAPI(title="Video Generator with Effects API") | |
| async def root(): | |
| return { | |
| "message": "Video Generator API", | |
| "effects": EFFECTS, | |
| "luts": {k: v["name"] for k, v in LUTS.items()}, | |
| "title_styles": TITLE_STYLES, | |
| "title_animations": TITLE_ANIMATIONS, | |
| "title_positions": TITLE_POSITIONS, | |
| "features": ["ken_burns", "typewriter", "cinemascope", "title", "video_input"], | |
| } | |
| async def health(): | |
| return {"status": "healthy"} | |
| async def generate( | |
| images: List[UploadFile] = File(None), | |
| video: UploadFile = File(None), | |
| audio: UploadFile = File(...), | |
| srt_text: str = Form(""), | |
| bgm: Optional[UploadFile] = File(None), | |
| effect: str = Form("snow"), | |
| lut: str = Form("none"), | |
| ken_burns: bool = Form(False), | |
| typewriter: bool = Form(False), | |
| cinemascope: bool = Form(False), | |
| title_text: str = Form(""), | |
| title_style: str = Form("brush_muyao"), | |
| title_color: str = Form("white"), | |
| title_animation: str = Form("typewriter"), | |
| title_duration: float = Form(4.0), | |
| title_position: str = Form("center"), | |
| title_start: float = Form(0.0), | |
| title_font_size: int = Form(0), | |
| typewriter_font_size: int = Form(0), | |
| fps: int = Form(25), | |
| ): | |
| # 验证 | |
| if not images and not video: | |
| raise HTTPException(400, "必须提供 images 或 video 中的一个") | |
| if images and video: | |
| raise HTTPException(400, "不能同时提供 images 和 video") | |
| # 创建临时目录 | |
| tmp_dir = tempfile.mkdtemp() | |
| try: | |
| # 保存文件 | |
| image_paths = None | |
| video_path = None | |
| if images: | |
| image_paths = [] | |
| for i, img in enumerate(images): | |
| suffix = Path(img.filename).suffix if img.filename else ".jpg" | |
| path = os.path.join(tmp_dir, f"image_{i}{suffix}") | |
| with open(path, "wb") as f: | |
| f.write(await img.read()) | |
| image_paths.append(path) | |
| elif video: | |
| suffix = Path(video.filename).suffix if video.filename else ".mp4" | |
| video_path = os.path.join(tmp_dir, f"video{suffix}") | |
| with open(video_path, "wb") as f: | |
| f.write(await video.read()) | |
| audio_path = os.path.join(tmp_dir, "audio" + (Path(audio.filename).suffix or ".wav")) | |
| with open(audio_path, "wb") as f: | |
| f.write(await audio.read()) | |
| bgm_path = None | |
| if bgm: | |
| bgm_path = os.path.join(tmp_dir, "bgm" + (Path(bgm.filename).suffix or ".mp3")) | |
| with open(bgm_path, "wb") as f: | |
| f.write(await bgm.read()) | |
| output_path = os.path.join(tmp_dir, "output.mp4") | |
| # 调用主合成函数 | |
| generate_video( | |
| image_paths=image_paths, | |
| video_path=video_path, | |
| audio_path=audio_path, | |
| srt_text=srt_text, | |
| bgm_path=bgm_path, | |
| effect=effect, | |
| output_path=output_path, | |
| lut=lut, | |
| ken_burns=ken_burns, | |
| typewriter=typewriter, | |
| cinemascope=cinemascope, | |
| title_text=title_text, | |
| title_style=title_style, | |
| title_color=title_color, | |
| title_animation=title_animation, | |
| title_duration=title_duration, | |
| title_position=title_position, | |
| title_start=title_start, | |
| title_font_size=title_font_size, | |
| typewriter_font_size=typewriter_font_size, | |
| fps=fps, | |
| ) | |
| with open(output_path, "rb") as f: | |
| video_bytes = f.read() | |
| import shutil | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| return Response(content=video_bytes, media_type="video/mp4", | |
| headers={"Content-Disposition": "attachment; filename=output.mp4"}) | |
| except Exception as e: | |
| print(traceback.format_exc(), file=sys.stderr) | |
| import shutil | |
| shutil.rmtree(tmp_dir, ignore_errors=True) | |
| raise HTTPException(500, detail=str(e)) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| uvicorn.run(app, host="0.0.0.0", port=7860) |