ffmpeg_cats / app3.py
Pepguy's picture
Rename app.py to app3.py
5522fe2 verified
raw
history blame
3.93 kB
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import subprocess
import base64
import os
import uuid
import shutil
import yt_dlp
app = FastAPI()
@app.get("/")
def greet_json():
return {"Hello": "Viral Cat Media Server Running!"}
class VideoRequest(BaseModel):
url: str
platform: str
is_pro: bool = False
fps: int = 2
def file_to_base64(filepath):
with open(filepath, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
@app.post("/process-video")
def process_video(req: VideoRequest):
job_id = str(uuid.uuid4())
work_dir = f"/tmp/viralcat_{job_id}"
os.makedirs(work_dir, exist_ok=True)
video_path = f"{work_dir}/video.mp4"
audio_path = f"{work_dir}/audio.mp3"
try:
# 1. Download Video
# 🚨 FIX: Removed the broken 0.0.0.0 binding.
# 🚨 ADDED: 'extractor_args' to bypass YouTube blocking cloud IPs.
ydl_opts = {
'format': 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best',
'outtmpl': video_path,
'quiet': True,
'no_warnings': True,
'nocheckcertificate': True,
'extractor_args': {'youtube': ['player_client=android']} # Disguises the server as a mobile phone
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([req.url])
# Verify video downloaded successfully
if not os.path.exists(video_path):
files = os.listdir(work_dir)
if files:
video_path = os.path.join(work_dir, files[0])
else:
raise ValueError("Failed to download video file.")
# 2. Check Duration (Limit to 2 minutes / 120 seconds if not Pro)
probe = subprocess.run([
"ffprobe", "-v", "error", "-show_entries",
"format=duration", "-of", "default=noprint_wrappers=1:nokey=1",
video_path
], capture_output=True, text=True, check=True)
duration = float(probe.stdout.strip() or 0)
if duration > 120 and not req.is_pro:
raise ValueError(f"Video is {duration:.1f}s long. Free users are limited to 120 seconds.")
# 3. Dynamic Frame Extraction (e.g., 2 frames per second)
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-vf", f"fps={req.fps}",
"-q:v", "2", f"{work_dir}/frame_%04d.jpg"
], check=True, capture_output=True)
# 4. Extract Uncut Audio
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path
], check=True, capture_output=True)
# 5. Gather and convert to Base64
frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")])
frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files]
audio_b64 = file_to_base64(audio_path) if os.path.exists(audio_path) else None
return {
"success": True,
"duration": duration,
"total_frames_extracted": len(frames_b64),
"frames": frames_b64,
"audio": audio_b64,
"thumbnail_url": "https://placehold.co/400x600/png"
}
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
error_detail = str(e)
if hasattr(e, 'stderr') and getattr(e, 'stderr'):
error_detail += f" | {e.stderr.decode()}"
raise HTTPException(status_code=500, detail=error_detail)
finally:
# 🚨 CLEANUP: Prevent server bloat
if os.path.exists(work_dir):
shutil.rmtree(work_dir, ignore_errors=True)
print(f"[CLEANUP] Deleted temporary folder: {work_dir}")