Spaces:
Sleeping
Sleeping
| import os | |
| import uuid | |
| import shutil | |
| import aiohttp | |
| import requests | |
| import traceback | |
| import subprocess | |
| from typing import List | |
| from pathlib import Path | |
| from pydantic import BaseModel | |
| from fastapi import FastAPI, BackgroundTasks | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import JSONResponse | |
| app = FastAPI() | |
| TMP_DIR = "/tmp" | |
| if not os.path.exists(TMP_DIR): | |
| os.makedirs(TMP_DIR) | |
| app.mount("/tmp", StaticFiles(directory=TMP_DIR), name="tmp") | |
| class ClipTiming(BaseModel): | |
| id: str | |
| start: str # Format: 00:01:00,000 | |
| end: str # Format: 00:01:15,000 | |
| class ClipRequest(BaseModel): | |
| url: str | |
| timing: List[ClipTiming] | |
| webhook: str | |
| def timestamp_to_seconds(ts: str) -> float: | |
| h, m, rest = ts.split(":") | |
| s, ms = rest.split(",") | |
| return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 | |
| def process_clips(input_file: str, timing: List[ClipTiming], job_id: str, webhook_url: str): | |
| result_clips = [] | |
| for clip in timing: | |
| duration = timestamp_to_seconds(clip.end) - timestamp_to_seconds(clip.start) | |
| duration_str = f"{duration:.3f}" | |
| output_file = os.path.join(TMP_DIR, f"clip_{clip.id}.mp4") | |
| command = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", input_file, | |
| "-ss", clip.start.replace(",", "."), | |
| "-t", duration_str, | |
| "-c:v", "libx264", | |
| "-c:a", "aac", | |
| output_file | |
| ] | |
| proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| if proc.returncode == 0: | |
| result_clips.append({ | |
| "id": clip.id, | |
| "url": f"/tmp/{os.path.basename(output_file)}" | |
| }) | |
| else: | |
| print(f"[ERROR] FFmpeg failed on clip ID {clip.id}:\n{proc.stderr.decode()}") | |
| print(f"[DEBUG] webhook_url={webhook_url!r}") | |
| if webhook_url: | |
| print(f"[DEBUG] weeee") | |
| try: | |
| payload = {"status": "completed"} | |
| requests.post(webhook_url, json=payload, timeout=10) | |
| except Exception as e: | |
| print(f"[WEBHOOK ERROR] Failed to call webhook: {e}") | |
| async def clip_video_batch( | |
| data: ClipRequest, | |
| background_tasks: BackgroundTasks | |
| ): | |
| job_id = str(uuid.uuid4()) | |
| input_file = os.path.join(TMP_DIR, f"input_{job_id}.mp4") | |
| # Download video once | |
| async with aiohttp.ClientSession() as session: | |
| async with session.get(data.url) as resp: | |
| if resp.status != 200: | |
| return JSONResponse(content={"error": "Failed to download video"}, status_code=400) | |
| with open(input_file, "wb") as f: | |
| f.write(await resp.read()) | |
| # Run in background | |
| background_tasks.add_task(process_clips, input_file, data.timing, job_id, data.webhook) | |
| # Respond right away | |
| return { | |
| "status": "started", | |
| "task_id": job_id, | |
| "message": "Clipping started. Webhook will be called when complete." | |
| } | |
| def list_tmp_files(): | |
| files = os.listdir(TMP_DIR) | |
| return {"files": files} | |
| def tmp_size(): | |
| total_size = 0 | |
| for file in os.listdir(TMP_DIR): | |
| path = os.path.join(TMP_DIR, file) | |
| if os.path.isfile(path): | |
| total_size += os.path.getsize(path) | |
| return {"size_mb": round(total_size / (1024 * 1024), 2)} | |
| def cleanup_tmp(): | |
| for file in os.listdir(TMP_DIR): | |
| path = os.path.join(TMP_DIR, file) | |
| try: | |
| os.remove(path) | |
| except Exception: | |
| continue | |
| return {"status": "All files deleted"} | |