multiClipper / main.py
clove1002's picture
Update main.py
3909ac6 verified
Raw
History Blame Contribute Delete
3.65 kB
import os
import uuid
import shutil
import aiohttp
import requests
import traceback
import subprocess
from typing import List
from pathlib import Path
from pydantic import BaseModel
from fastapi import FastAPI, BackgroundTasks
from fastapi.staticfiles import StaticFiles
from fastapi.responses import JSONResponse
app = FastAPI()
TMP_DIR = "/tmp"
if not os.path.exists(TMP_DIR):
os.makedirs(TMP_DIR)
app.mount("/tmp", StaticFiles(directory=TMP_DIR), name="tmp")
class ClipTiming(BaseModel):
id: str
start: str # Format: 00:01:00,000
end: str # Format: 00:01:15,000
class ClipRequest(BaseModel):
url: str
timing: List[ClipTiming]
webhook: str
def timestamp_to_seconds(ts: str) -> float:
h, m, rest = ts.split(":")
s, ms = rest.split(",")
return int(h) * 3600 + int(m) * 60 + int(s) + int(ms) / 1000
def process_clips(input_file: str, timing: List[ClipTiming], job_id: str, webhook_url: str):
result_clips = []
for clip in timing:
duration = timestamp_to_seconds(clip.end) - timestamp_to_seconds(clip.start)
duration_str = f"{duration:.3f}"
output_file = os.path.join(TMP_DIR, f"clip_{clip.id}.mp4")
command = [
"ffmpeg",
"-y",
"-i", input_file,
"-ss", clip.start.replace(",", "."),
"-t", duration_str,
"-c:v", "libx264",
"-c:a", "aac",
output_file
]
proc = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
if proc.returncode == 0:
result_clips.append({
"id": clip.id,
"url": f"/tmp/{os.path.basename(output_file)}"
})
else:
print(f"[ERROR] FFmpeg failed on clip ID {clip.id}:\n{proc.stderr.decode()}")
print(f"[DEBUG] webhook_url={webhook_url!r}")
if webhook_url:
print(f"[DEBUG] weeee")
try:
payload = {"status": "completed"}
requests.post(webhook_url, json=payload, timeout=10)
except Exception as e:
print(f"[WEBHOOK ERROR] Failed to call webhook: {e}")
@app.post("/clip")
async def clip_video_batch(
data: ClipRequest,
background_tasks: BackgroundTasks
):
job_id = str(uuid.uuid4())
input_file = os.path.join(TMP_DIR, f"input_{job_id}.mp4")
# Download video once
async with aiohttp.ClientSession() as session:
async with session.get(data.url) as resp:
if resp.status != 200:
return JSONResponse(content={"error": "Failed to download video"}, status_code=400)
with open(input_file, "wb") as f:
f.write(await resp.read())
# Run in background
background_tasks.add_task(process_clips, input_file, data.timing, job_id, data.webhook)
# Respond right away
return {
"status": "started",
"task_id": job_id,
"message": "Clipping started. Webhook will be called when complete."
}
@app.get("/list")
def list_tmp_files():
files = os.listdir(TMP_DIR)
return {"files": files}
@app.get("/size")
def tmp_size():
total_size = 0
for file in os.listdir(TMP_DIR):
path = os.path.join(TMP_DIR, file)
if os.path.isfile(path):
total_size += os.path.getsize(path)
return {"size_mb": round(total_size / (1024 * 1024), 2)}
@app.delete("/cleanup")
def cleanup_tmp():
for file in os.listdir(TMP_DIR):
path = os.path.join(TMP_DIR, file)
try:
os.remove(path)
except Exception:
continue
return {"status": "All files deleted"}