import os import uuid import shutil import aiohttp import requests import traceback import subprocess from typing import List from pathlib import Path from pydantic import BaseModel from fastapi import FastAPI, BackgroundTasks from fastapi.staticfiles import StaticFiles from fastapi.responses import JSONResponse app = FastAPI() TMP_DIR = "/tmp" if not os.path.exists(TMP_DIR): os.makedirs(TMP_DIR) app.mount("/tmp", StaticFiles(directory=TMP_DIR), name="tmp") class ClipTiming(BaseModel): id: str start: str # Format: 00:01:00,000 end: str # Format: 00:01:15,000 class ClipRequest(BaseModel): url: str timing: List[ClipTiming] webhook: str def timestamp_to_seconds(ts: str) -> float: h, m, rest = ts.split(":") s, ms = rest.split(",") return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000 def process_clips(input_file: str, timing: List[ClipTiming], job_id: str, webhook_url: str): result_clips = [] for clip in timing: duration = timestamp_to_seconds(clip.end) - timestamp_to_seconds(clip.start) duration_str = f"{duration:.3f}" output_file = os.path.join(TMP_DIR, f"clip_{clip.id}.mp4") command = [ "ffmpeg", "-y", "-i", input_file, "-ss", clip.start.replace(",", "."), "-t", duration_str, "-c:v", "libx264", "-c:a", "aac", output_file ] proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) if proc.returncode == 0: result_clips.append({ "id": clip.id, "url": f"/tmp/{os.path.basename(output_file)}" }) else: print(f"[ERROR] FFmpeg failed on clip ID {clip.id}:\n{proc.stderr.decode()}") print(f"[DEBUG] webhook_url={webhook_url!r}") if webhook_url: print(f"[DEBUG] weeee") try: payload = {"status": "completed"} requests.post(webhook_url, json=payload, timeout=10) except Exception as e: print(f"[WEBHOOK ERROR] Failed to call webhook: {e}") @app.post("/clip") async def clip_video_batch( data: ClipRequest, background_tasks: BackgroundTasks ): job_id = str(uuid.uuid4()) input_file = os.path.join(TMP_DIR, f"input_{job_id}.mp4") # Download video once async with aiohttp.ClientSession() as session: async with session.get(data.url) as resp: if resp.status != 200: return JSONResponse(content={"error": "Failed to download video"}, status_code=400) with open(input_file, "wb") as f: f.write(await resp.read()) # Run in background background_tasks.add_task(process_clips, input_file, data.timing, job_id, data.webhook) # Respond right away return { "status": "started", "task_id": job_id, "message": "Clipping started. Webhook will be called when complete." } @app.get("/list") def list_tmp_files(): files = os.listdir(TMP_DIR) return {"files": files} @app.get("/size") def tmp_size(): total_size = 0 for file in os.listdir(TMP_DIR): path = os.path.join(TMP_DIR, file) if os.path.isfile(path): total_size += os.path.getsize(path) return {"size_mb": round(total_size / (1024 * 1024), 2)} @app.delete("/cleanup") def cleanup_tmp(): for file in os.listdir(TMP_DIR): path = os.path.join(TMP_DIR, file) try: os.remove(path) except Exception: continue return {"status": "All files deleted"}