Spaces:
Running
Running
| import uvicorn | |
| from fastapi import FastAPI, File, UploadFile, Form, HTTPException | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from typing import Optional | |
| import services.vertex_service as vertex_service | |
| import services.video_service as video_service | |
| import services.image_service as image_service # Import the new service | |
| import os | |
| import shutil | |
| from datetime import datetime | |
| app = FastAPI() | |
| async def generate_video( | |
| api_key: str = Form(...), | |
| prompt: str = Form(...), | |
| negative_prompt: Optional[str] = Form(None), | |
| model: str = Form(...), | |
| aspect_ratio: str = Form(...), | |
| duration: int = Form(...), | |
| resolution: str = Form(...), | |
| sample_count: int = Form(...), | |
| seed: Optional[int] = Form(None), | |
| person_generation: str = Form(...), | |
| generate_audio: bool = Form(...), | |
| enhance_prompt: bool = Form(...), | |
| image: Optional[UploadFile] = File(None), | |
| video: Optional[UploadFile] = File(None), | |
| # Add new form fields for image processing | |
| image_process_mode: Optional[str] = Form(None), | |
| image_fill_color: Optional[str] = Form("#000000"), | |
| ): | |
| try: | |
| project_id = await vertex_service.get_project_id(api_key) | |
| params = { | |
| "prompt": prompt, | |
| "negativePrompt": negative_prompt, | |
| "model": model, | |
| "aspectRatio": aspect_ratio, | |
| "durationSeconds": duration, | |
| "resolution": resolution, | |
| "sampleCount": sample_count, | |
| "seed": seed, | |
| "personGeneration": person_generation, | |
| "generateAudio": generate_audio, | |
| "enhancePrompt": enhance_prompt, | |
| } | |
| if image: | |
| image_bytes = await image.read() | |
| # 现在所有宽高比都支持图片处理 | |
| if image_process_mode and image_process_mode != 'none': | |
| # Process the image if a mode is selected | |
| processed_image_bytes = image_service.process_image( | |
| image_bytes=image_bytes, | |
| mode=image_process_mode, | |
| fill_color=image_fill_color, | |
| target_aspect_ratio=aspect_ratio # 传递目标宽高比 | |
| ) | |
| params["image"] = processed_image_bytes | |
| params["image_mime_type"] = "image/jpeg" # Service always returns JPEG | |
| else: | |
| # Otherwise, use the original image | |
| params["image"] = image_bytes | |
| params["image_mime_type"] = image.content_type | |
| if video: | |
| params["video"] = await video.read() | |
| params["video_mime_type"] = video.content_type | |
| model_id = params["model"] | |
| operation_name = await vertex_service.start_video_generation(project_id, api_key, params) | |
| video_data = await vertex_service.poll_video_status(project_id, model_id, operation_name, api_key) | |
| video_paths = await video_service.save_video(video_data) | |
| # Create a list of URLs for the frontend | |
| video_urls = [f"/api/video/{os.path.basename(p)}" for p in video_paths] | |
| return {"video_urls": video_urls} | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=str(e)) | |
| async def get_video(video_id: str): | |
| video_path = os.path.join(video_service.VIDEO_DIR, video_id) | |
| if os.path.exists(video_path): | |
| return FileResponse(video_path) | |
| raise HTTPException(status_code=404, detail="Video not found") | |
| async def save_video_to_server(video_url: str = Form(...)): | |
| try: | |
| video_id = os.path.basename(video_url) | |
| source_path = os.path.join(video_service.VIDEO_DIR, video_id) | |
| if not os.path.exists(source_path): | |
| raise HTTPException(status_code=404, detail="Video not found") | |
| output_dir = "output" | |
| os.makedirs(output_dir, exist_ok=True) | |
| timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") | |
| filename = f"{timestamp}.mp4" | |
| destination_path = os.path.join(output_dir, filename) | |
| shutil.copy(source_path, destination_path) | |
| return JSONResponse(content={"message": "Video saved successfully", "path": destination_path}) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |
| app.mount("/", StaticFiles(directory="../frontend", html=True), name="static") | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=8000) |