ffmpeg_cats / app2.py
Pepguy's picture
Rename app.py to app2.py
3a2ae67 verified
raw
history blame
2.77 kB
from fastapi import FastAPI, HTTPException, UploadFile, File, Form
import subprocess
import base64
import os
import uuid
import shutil
app = FastAPI()
@app.get("/")
def greet_json():
return {"status": "online", "engine": "Viral Cat Local Processor"}
def file_to_base64(filepath):
if not os.path.exists(filepath): return None
with open(filepath, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
@app.post("/process-video")
async def process_video(
file: UploadFile = File(...),
fps: int = Form(2),
is_pro: bool = Form(False)
):
job_id = str(uuid.uuid4())
work_dir = f"/tmp/viralcat_{job_id}"
os.makedirs(work_dir, exist_ok=True)
video_path = os.path.join(work_dir, "video.mp4")
audio_path = os.path.join(work_dir, "audio.mp3")
try:
# 1. Save the uploaded file locally
with open(video_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
# 2. Check Duration (120s limit for free)
probe = subprocess.run([
"ffprobe", "-v", "error", "-show_entries",
"format=duration", "-of", "default=noprint_wrappers=1:nokey=1",
video_path
], capture_output=True, text=True, check=True)
duration = float(probe.stdout.strip() or 0)
if duration > 120 and not is_pro:
raise ValueError(f"Video ({duration:.1f}s) exceeds 120s limit.")
# 3. Extract Frames (2 per second)
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-vf", f"fps={fps}",
"-q:v", "4", f"{work_dir}/frame_%04d.jpg"
], check=True, capture_output=True)
# 4. Extract Audio
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path
], check=True, capture_output=True)
# 5. Base64 Conversion
frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")])
if len(frame_files) > 80: frame_files = frame_files[:80] # Safety cap
frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files]
audio_b64 = file_to_base64(audio_path)
print(f"Done with video, {len(frames_b64)}")
return {
"success": True,
"total_frames": len(frames_b64),
"duration": duration,
"frames": frames_b64,
"audio": audio_b64
}
except Exception as e:
print(f"ERROR: {str(e)}")
return {"success": False, "error": str(e)}
finally:
# CLEANUP
if os.path.exists(work_dir):
shutil.rmtree(work_dir, ignore_errors=True)