Spaces:
Running
Running
File size: 3,931 Bytes
541e02b 7464193 541e02b 7464193 541e02b 7464193 93429ef 541e02b 93429ef 541e02b 7464193 541e02b 7464193 541e02b 7464193 541e02b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 | from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import subprocess
import base64
import os
import uuid
import shutil
import yt_dlp
app = FastAPI()
@app.get("/")
def greet_json():
return {"Hello": "Viral Cat Media Server Running!"}
class VideoRequest(BaseModel):
url: str
platform: str
is_pro: bool = False
fps: int = 2
def file_to_base64(filepath):
with open(filepath, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
@app.post("/process-video")
def process_video(req: VideoRequest):
job_id = str(uuid.uuid4())
work_dir = f"/tmp/viralcat_{job_id}"
os.makedirs(work_dir, exist_ok=True)
video_path = f"{work_dir}/video.mp4"
audio_path = f"{work_dir}/audio.mp3"
try:
# 1. Download Video
# 🚨 FIX: Removed the broken 0.0.0.0 binding.
# 🚨 ADDED: 'extractor_args' to bypass YouTube blocking cloud IPs.
ydl_opts = {
'format': 'bestvideo[height<=720][ext=mp4]+bestaudio[ext=m4a]/best[height<=720][ext=mp4]/best',
'outtmpl': video_path,
'quiet': True,
'no_warnings': True,
'nocheckcertificate': True,
'extractor_args': {'youtube': ['player_client=android']} # Disguises the server as a mobile phone
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([req.url])
# Verify video downloaded successfully
if not os.path.exists(video_path):
files = os.listdir(work_dir)
if files:
video_path = os.path.join(work_dir, files[0])
else:
raise ValueError("Failed to download video file.")
# 2. Check Duration (Limit to 2 minutes / 120 seconds if not Pro)
probe = subprocess.run([
"ffprobe", "-v", "error", "-show_entries",
"format=duration", "-of", "default=noprint_wrappers=1:nokey=1",
video_path
], capture_output=True, text=True, check=True)
duration = float(probe.stdout.strip() or 0)
if duration > 120 and not req.is_pro:
raise ValueError(f"Video is {duration:.1f}s long. Free users are limited to 120 seconds.")
# 3. Dynamic Frame Extraction (e.g., 2 frames per second)
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-vf", f"fps={req.fps}",
"-q:v", "2", f"{work_dir}/frame_%04d.jpg"
], check=True, capture_output=True)
# 4. Extract Uncut Audio
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-q:a", "0", "-map", "a", "-ac", "1", "-b:a", "64k", audio_path
], check=True, capture_output=True)
# 5. Gather and convert to Base64
frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_") and f.endswith(".jpg")])
frames_b64 =[file_to_base64(os.path.join(work_dir, f)) for f in frame_files]
audio_b64 = file_to_base64(audio_path) if os.path.exists(audio_path) else None
return {
"success": True,
"duration": duration,
"total_frames_extracted": len(frames_b64),
"frames": frames_b64,
"audio": audio_b64,
"thumbnail_url": "https://placehold.co/400x600/png"
}
except ValueError as ve:
raise HTTPException(status_code=400, detail=str(ve))
except Exception as e:
error_detail = str(e)
if hasattr(e, 'stderr') and getattr(e, 'stderr'):
error_detail += f" | {e.stderr.decode()}"
raise HTTPException(status_code=500, detail=error_detail)
finally:
# 🚨 CLEANUP: Prevent server bloat
if os.path.exists(work_dir):
shutil.rmtree(work_dir, ignore_errors=True)
print(f"[CLEANUP] Deleted temporary folder: {work_dir}") |