| from fastapi import FastAPI, HTTPException |
| from fastapi.staticfiles import StaticFiles |
| from pydantic import BaseModel, HttpUrl |
| from dotenv import load_dotenv |
| from typing import List |
| import os |
| import asyncio |
| import uuid |
| import aiohttp |
| import re |
| import shutil |
| import aiofiles |
|
|
| load_dotenv() |
|
|
| |
| app = FastAPI() |
|
|
| |
| os.makedirs("staticfiles", exist_ok=True) |
| app.mount("/static", StaticFiles(directory="staticfiles"), name="static") |
|
|
| |
| class SlideshowRequest(BaseModel): |
| image_urls: List[HttpUrl] |
| audio_url: HttpUrl |
| duration: int |
| zoom: bool = False |
|
|
| def extract_google_drive_id(url): |
| """Extract file ID from a Google Drive URL""" |
| pattern = r'(?:/file/d/|id=|/open\?id=)([^/&]+)' |
| match = re.search(pattern, str(url)) |
| return match.group(1) if match else None |
|
|
| async def download_file(url, local_path): |
| """Download a file from URL to local path asynchronously""" |
| try: |
| |
| if "drive.google.com" in str(url): |
| file_id = extract_google_drive_id(url) |
| if file_id: |
| url = f"https://drive.google.com/uc?export=download&id={file_id}" |
| |
| async with aiohttp.ClientSession() as session: |
| async with session.get(str(url)) as response: |
| response.raise_for_status() |
| |
| async with aiofiles.open(local_path, 'wb') as f: |
| while True: |
| chunk = await response.content.read(8192) |
| if not chunk: |
| break |
| await f.write(chunk) |
| return True |
| except Exception as e: |
| print(f"Error downloading {url}: {str(e)}") |
| return False |
|
|
| async def upload_video(url): |
| """Upload a video and return the upload URL""" |
| try: |
| data = { |
| 'file': str(url), |
| 'upload_preset': 'video-input-production', |
| } |
| |
| async with aiohttp.ClientSession() as session: |
| async with session.post( |
| os.getenv("VIDEO_UPLOAD_URL"), |
| data=data |
| ) as response: |
| if response.status != 200: |
| print(f"Upload failed with status {response.status}") |
| return None |
| |
| result = await response.json() |
| return result.get('secure_url') |
| |
| except Exception as e: |
| print(f"Error uploading video: {str(e)}") |
| return None |
|
|
| async def create_slideshow(image_paths, audio_path, output_path, duration, zoom=False, zoom_ratio=0.04): |
| """Generate slideshow from images and audio using ffmpeg asynchronously""" |
| |
| concat_file = "temp_concat.txt" |
| |
| if not zoom: |
| |
| async with aiofiles.open(concat_file, "w") as f: |
| for img in image_paths: |
| await f.write(f"file '{img}'\n") |
| await f.write(f"duration {duration}\n") |
| |
| |
| if image_paths: |
| await f.write(f"file '{image_paths[-1]}'\n") |
| |
| |
| total_duration = len(image_paths) * duration |
| cmd = [ |
| "ffmpeg", |
| "-f", "concat", |
| "-safe", "0", |
| "-i", concat_file, |
| "-i", audio_path, |
| "-c:v", "libx264", |
| "-pix_fmt", "yuv420p", |
| "-c:a", "aac", |
| "-shortest", |
| "-y", |
| "-t", str(total_duration), |
| output_path |
| ] |
| else: |
| |
| |
| filters = [] |
| for i, img in enumerate(image_paths): |
| filter_str = ( |
| f"[{i}:v]scale=1920:1080:force_original_aspect_ratio=decrease," |
| f"pad=1920:1080:(ow-iw)/2:(oh-ih)/2,setsar=1," |
| f"zoompan=z='min(zoom+0.0015,1.5)':d={duration*25}:" |
| f"x='iw/2-(iw/zoom/2)':y='ih/2-(ih/zoom/2)'," |
| f"trim=duration={duration},setpts=PTS-STARTPTS[v{i}];" |
| ) |
| filters.append(filter_str) |
| |
| |
| filter_complex = "".join(filters) |
| for i in range(len(image_paths)): |
| filter_complex += f"[v{i}]" |
| filter_complex += f"concat=n={len(image_paths)}:v=1:a=0[outv]" |
| |
| cmd = ["ffmpeg", "-y"] |
| |
| for img in image_paths: |
| cmd.extend(["-loop", "1", "-i", img]) |
| |
| cmd.extend([ |
| "-i", audio_path, |
| "-filter_complex", filter_complex, |
| "-map", "[outv]", |
| "-map", f"{len(image_paths)}:a", |
| "-c:v", "libx264", |
| "-pix_fmt", "yuv420p", |
| "-c:a", "aac", |
| "-shortest", |
| output_path |
| ]) |
| try: |
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE |
| ) |
| stdout, stderr = await process.communicate() |
| |
| if os.path.exists(concat_file): |
| os.remove(concat_file) |
| |
| if process.returncode != 0: |
| print(f"FFmpeg error: {stderr.decode()}") |
| return False |
| return True |
| except Exception as e: |
| print(f"FFmpeg error: {str(e)}") |
| if os.path.exists(concat_file): |
| os.remove(concat_file) |
| return False |
|
|
| @app.post("/make_slideshow") |
| async def make_slideshow(request: SlideshowRequest): |
| """ |
| Create a slideshow from images and audio with specified duration per image. |
| Returns the URL of the generated video. |
| """ |
| |
| request_id = str(uuid.uuid4()) |
| request_dir = os.path.join("staticfiles", request_id) |
| os.makedirs(request_dir, exist_ok=True) |
| |
| try: |
| |
| image_paths = [] |
| download_tasks = [] |
| |
| for i, url in enumerate(request.image_urls): |
| image_path = os.path.join(request_dir, f"image_{i:03d}.png") |
| image_paths.append(image_path) |
| download_tasks.append(download_file(url, image_path)) |
| |
| |
| audio_path = os.path.join(request_dir, "audio.mp3") |
| download_tasks.append(download_file(request.audio_url, audio_path)) |
| |
| |
| results = await asyncio.gather(*download_tasks) |
| |
| |
| if not all(results[:-1]): |
| raise HTTPException(status_code=400, detail="Failed to download one or more images") |
| |
| if not results[-1]: |
| raise HTTPException(status_code=400, detail=f"Failed to download audio: {request.audio_url}") |
| |
| |
| output_path = os.path.join(request_dir, "slideshow.mp4") |
| |
| |
| if not await create_slideshow(image_paths, audio_path, output_path, request.duration, request.zoom): |
| raise HTTPException(status_code=500, detail="Failed to create slideshow") |
| |
| |
| base_url = "https://saq1b-api.hf.space/static" |
| video_url = f"{base_url}/{request_id}/slideshow.mp4" |
| return {"url": video_url} |
| |
| except Exception as e: |
| |
| if os.path.exists(request_dir): |
| shutil.rmtree(request_dir) |
| raise HTTPException(status_code=500, detail=f"Error: {str(e)}") |
|
|
| if __name__ == "__main__": |
| import uvicorn |
| uvicorn.run(app, host="0.0.0.0", port=7860) |