express / backend /main.py
Raven10492's picture
Upload 7 files
7dd9c94 verified
import uvicorn
from fastapi import FastAPI, File, UploadFile, Form, HTTPException
from fastapi.responses import FileResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
from typing import Optional
import services.vertex_service as vertex_service
import services.video_service as video_service
import services.image_service as image_service # Import the new service
import os
import shutil
from datetime import datetime
app = FastAPI()
@app.post("/api/generate_video")
async def generate_video(
api_key: str = Form(...),
prompt: str = Form(...),
negative_prompt: Optional[str] = Form(None),
model: str = Form(...),
aspect_ratio: str = Form(...),
duration: int = Form(...),
resolution: str = Form(...),
sample_count: int = Form(...),
seed: Optional[int] = Form(None),
person_generation: str = Form(...),
generate_audio: bool = Form(...),
enhance_prompt: bool = Form(...),
image: Optional[UploadFile] = File(None),
video: Optional[UploadFile] = File(None),
# Add new form fields for image processing
image_process_mode: Optional[str] = Form(None),
image_fill_color: Optional[str] = Form("#000000"),
):
try:
project_id = await vertex_service.get_project_id(api_key)
params = {
"prompt": prompt,
"negativePrompt": negative_prompt,
"model": model,
"aspectRatio": aspect_ratio,
"durationSeconds": duration,
"resolution": resolution,
"sampleCount": sample_count,
"seed": seed,
"personGeneration": person_generation,
"generateAudio": generate_audio,
"enhancePrompt": enhance_prompt,
}
if image:
image_bytes = await image.read()
# 现在所有宽高比都支持图片处理
if image_process_mode and image_process_mode != 'none':
# Process the image if a mode is selected
processed_image_bytes = image_service.process_image(
image_bytes=image_bytes,
mode=image_process_mode,
fill_color=image_fill_color,
target_aspect_ratio=aspect_ratio # 传递目标宽高比
)
params["image"] = processed_image_bytes
params["image_mime_type"] = "image/jpeg" # Service always returns JPEG
else:
# Otherwise, use the original image
params["image"] = image_bytes
params["image_mime_type"] = image.content_type
if video:
params["video"] = await video.read()
params["video_mime_type"] = video.content_type
model_id = params["model"]
operation_name = await vertex_service.start_video_generation(project_id, api_key, params)
video_data = await vertex_service.poll_video_status(project_id, model_id, operation_name, api_key)
video_paths = await video_service.save_video(video_data)
# Create a list of URLs for the frontend
video_urls = [f"/api/video/{os.path.basename(p)}" for p in video_paths]
return {"video_urls": video_urls}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/api/video/{video_id}")
async def get_video(video_id: str):
video_path = os.path.join(video_service.VIDEO_DIR, video_id)
if os.path.exists(video_path):
return FileResponse(video_path)
raise HTTPException(status_code=404, detail="Video not found")
@app.post("/api/save_video")
async def save_video_to_server(video_url: str = Form(...)):
try:
video_id = os.path.basename(video_url)
source_path = os.path.join(video_service.VIDEO_DIR, video_id)
if not os.path.exists(source_path):
raise HTTPException(status_code=404, detail="Video not found")
output_dir = "output"
os.makedirs(output_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
filename = f"{timestamp}.mp4"
destination_path = os.path.join(output_dir, filename)
shutil.copy(source_path, destination_path)
return JSONResponse(content={"message": "Video saved successfully", "path": destination_path})
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
app.mount("/", StaticFiles(directory="../frontend", html=True), name="static")
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)