import uvicorn from fastapi import FastAPI, File, UploadFile, Form, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from typing import Optional import services.vertex_service as vertex_service import services.video_service as video_service import services.image_service as image_service # Import the new service import os import shutil from datetime import datetime app = FastAPI() @app.post("/api/generate_video") async def generate_video( api_key: str = Form(...), prompt: str = Form(...), negative_prompt: Optional[str] = Form(None), model: str = Form(...), aspect_ratio: str = Form(...), duration: int = Form(...), resolution: str = Form(...), sample_count: int = Form(...), seed: Optional[int] = Form(None), person_generation: str = Form(...), generate_audio: bool = Form(...), enhance_prompt: bool = Form(...), image: Optional[UploadFile] = File(None), video: Optional[UploadFile] = File(None), # Add new form fields for image processing image_process_mode: Optional[str] = Form(None), image_fill_color: Optional[str] = Form("#000000"), ): try: project_id = await vertex_service.get_project_id(api_key) params = { "prompt": prompt, "negativePrompt": negative_prompt, "model": model, "aspectRatio": aspect_ratio, "durationSeconds": duration, "resolution": resolution, "sampleCount": sample_count, "seed": seed, "personGeneration": person_generation, "generateAudio": generate_audio, "enhancePrompt": enhance_prompt, } if image: image_bytes = await image.read() # 现在所有宽高比都支持图片处理 if image_process_mode and image_process_mode != 'none': # Process the image if a mode is selected processed_image_bytes = image_service.process_image( image_bytes=image_bytes, mode=image_process_mode, fill_color=image_fill_color, target_aspect_ratio=aspect_ratio # 传递目标宽高比 ) params["image"] = processed_image_bytes params["image_mime_type"] = "image/jpeg" # Service always returns JPEG else: # Otherwise, use the original image params["image"] = image_bytes params["image_mime_type"] = image.content_type if video: params["video"] = await video.read() params["video_mime_type"] = video.content_type model_id = params["model"] operation_name = await vertex_service.start_video_generation(project_id, api_key, params) video_data = await vertex_service.poll_video_status(project_id, model_id, operation_name, api_key) video_paths = await video_service.save_video(video_data) # Create a list of URLs for the frontend video_urls = [f"/api/video/{os.path.basename(p)}" for p in video_paths] return {"video_urls": video_urls} except Exception as e: raise HTTPException(status_code=400, detail=str(e)) @app.get("/api/video/{video_id}") async def get_video(video_id: str): video_path = os.path.join(video_service.VIDEO_DIR, video_id) if os.path.exists(video_path): return FileResponse(video_path) raise HTTPException(status_code=404, detail="Video not found") @app.post("/api/save_video") async def save_video_to_server(video_url: str = Form(...)): try: video_id = os.path.basename(video_url) source_path = os.path.join(video_service.VIDEO_DIR, video_id) if not os.path.exists(source_path): raise HTTPException(status_code=404, detail="Video not found") output_dir = "output" os.makedirs(output_dir, exist_ok=True) timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") filename = f"{timestamp}.mp4" destination_path = os.path.join(output_dir, filename) shutil.copy(source_path, destination_path) return JSONResponse(content={"message": "Video saved successfully", "path": destination_path}) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) app.mount("/", StaticFiles(directory="../frontend", html=True), name="static") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=8000)