Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import requests | |
| import subprocess | |
| import shlex | |
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from fastapi.responses import FileResponse | |
| from urllib.parse import urlparse | |
| FILES_DIR = "/app/files" | |
| os.makedirs(FILES_DIR, exist_ok=True) | |
| app = FastAPI() | |
| class GenerateVideoRequest(BaseModel): | |
| image_url: str | |
| audio_url: str | |
| ffmpeg_cmd: str # Required placeholders: {image}, {audio}, {output} | |
| def get_extension_from_url(url): | |
| path = urlparse(url).path | |
| ext = os.path.splitext(path)[1] | |
| return ext if ext else '' | |
| def download_file(url, dest_path): | |
| try: | |
| r = requests.get(url, timeout=20) | |
| r.raise_for_status() | |
| with open(dest_path, 'wb') as f: | |
| f.write(r.content) | |
| return True | |
| except Exception as e: | |
| print(f"Failed to download {url}: {e}") | |
| return False | |
| async def generate_video(req: GenerateVideoRequest): | |
| image_ext = get_extension_from_url(req.image_url) | |
| audio_ext = get_extension_from_url(req.audio_url) | |
| image_file = os.path.join(FILES_DIR, f"{uuid.uuid4()}{image_ext}") | |
| audio_file = os.path.join(FILES_DIR, f"{uuid.uuid4()}{audio_ext}") | |
| output_file = os.path.join(FILES_DIR, f"{uuid.uuid4()}.mp4") | |
| if not download_file(req.image_url, image_file): | |
| return {"error": f"Failed to download {req.image_url}"} | |
| if not download_file(req.audio_url, audio_file): | |
| return {"error": f"Failed to download {req.audio_url}"} | |
| final_cmd = req.ffmpeg_cmd.format(image=image_file, audio=audio_file, output=output_file) | |
| final_cmd_list = shlex.split(final_cmd) | |
| try: | |
| result = subprocess.run(final_cmd_list, capture_output=True, text=True, check=True) | |
| except subprocess.CalledProcessError as e: | |
| return { | |
| "error": "FFmpeg failed", | |
| "stdout": e.stdout, | |
| "stderr": e.stderr, | |
| "cmd": final_cmd | |
| } | |
| download_url = f"https://hivecorp-video-gen-2.hf.space/files/{os.path.basename(output_file)}" | |
| return {"status": "success", "download_url": download_url} | |
| async def get_file(filename: str): | |
| file_path = os.path.join(FILES_DIR, filename) | |
| if os.path.exists(file_path): | |
| return FileResponse(file_path, media_type='video/mp4') | |
| return {"error": "File not found"} | |