Spaces:
Running
Running
File size: 2,915 Bytes
e2290de f07a195 e2290de 09b4df3 e2290de f07a195 e2290de f07a195 e2290de 1012f9d f07a195 1012f9d e2290de f07a195 e2290de f07a195 2fa21b8 f07a195 2fa21b8 f07a195 90d1287 f07a195 caed07e f07a195 e2290de f07a195 727613b f07a195 727613b f07a195 e2290de | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 | from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional
import subprocess, base64, os, uuid, shutil, whisper
app = FastAPI()
whisper_model = whisper.load_model("tiny")
# 🚨 DYNAMIC SCHEMA
class VideoJsonRequest(BaseModel):
video_base64: str
num_frames: Optional[int] = 15 # Default to 15
get_transcript: Optional[bool] = True # Default to True
def to_b64(path):
with open(path, "rb") as f: return base64.b64encode(f.read()).decode('utf-8')
@app.get("/")
async def index():
return {"success": True, "engine": "Dynamic Viral Cat Media Server"}
@app.post("/process-video")
async def process(req: VideoJsonRequest):
uid = str(uuid.uuid4())
tmp = f"/tmp/{uid}"
os.makedirs(tmp)
v_p = f"{tmp}/v.mp4"
a_p = f"{tmp}/a.wav"
try:
with open(v_p, "wb") as f: f.write(base64.b64decode(req.video_base64))
# Get Duration
probe = subprocess.run(["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", v_p], capture_output=True, text=True).stdout
dur = float(probe.strip() or 0)
# 🚨 DYNAMIC FRAME MATH: Spreads requested frames evenly across duration
calc_fps = req.num_frames / max(dur, 1)
print("-------------------")
print(f"Requested Frames: {req.num_frames} | Duration: {dur:.2f}s | Calculated FPS: {calc_fps:.2f}")
print(f"Transcript requested: {req.get_transcript}")
print("-------------------")
# Extract X frames
subprocess.run(["ffmpeg", "-y",
"-loglevel", "error",
"-i", v_p, "-vf", f"fps={calc_fps}",
"-vframes", str(req.num_frames),
"-q:v", "5", f"{tmp}/f_%03d.jpg"])
# 🚨 CONDITIONAL TRANSCRIPT
txt = ""
if req.get_transcript:
subprocess.run(["ffmpeg", "-y",
"-loglevel", "error",
"-i", v_p, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", a_p])
if os.path.exists(a_p):
result = whisper_model.transcribe(a_p)
lines = [f"[{s['start']:.2f}] {s['text'].strip()}" for s in result["segments"]]
txt = "\n".join(lines)
# Gather frames
f_names = sorted([f"{tmp}/{f}" for f in os.listdir(tmp) if f.startswith("f_")])
imgs = [to_b64(f) for f in f_names]
print("-------------------")
print(f"Successfully extracted {len(imgs)} images.")
print("-------------------")
return {"success": True, "transcript": txt, "frames": imgs, "thumbnail": imgs[0] if imgs else None}
except Exception as e: return {"success": False, "error": str(e)}
finally: shutil.rmtree(tmp, ignore_errors=True) |