Spaces:
Running
Running
| from fastapi import FastAPI | |
| from pydantic import BaseModel | |
| from typing import Optional | |
| import subprocess, base64, os, uuid, shutil, whisper | |
| app = FastAPI() | |
| whisper_model = whisper.load_model("tiny") | |
| # 🚨 DYNAMIC SCHEMA | |
| class VideoJsonRequest(BaseModel): | |
| video_base64: str | |
| num_frames: Optional[int] = 15 # Default to 15 | |
| get_transcript: Optional[bool] = True # Default to True | |
| def to_b64(path): | |
| with open(path, "rb") as f: return base64.b64encode(f.read()).decode('utf-8') | |
| async def index(): | |
| return {"success": True, "engine": "Dynamic Viral Cat Media Server"} | |
| async def process(req: VideoJsonRequest): | |
| uid = str(uuid.uuid4()) | |
| tmp = f"/tmp/{uid}" | |
| os.makedirs(tmp) | |
| v_p = f"{tmp}/v.mp4" | |
| a_p = f"{tmp}/a.wav" | |
| try: | |
| with open(v_p, "wb") as f: f.write(base64.b64decode(req.video_base64)) | |
| # Get Duration | |
| probe = subprocess.run(["ffprobe", "-v", "error", "-show_entries", "format=duration", "-of", "default=noprint_wrappers=1:nokey=1", v_p], capture_output=True, text=True).stdout | |
| dur = float(probe.strip() or 0) | |
| # 🚨 DYNAMIC FRAME MATH: Spreads requested frames evenly across duration | |
| calc_fps = req.num_frames / max(dur, 1) | |
| print("-------------------") | |
| print(f"Requested Frames: {req.num_frames} | Duration: {dur:.2f}s | Calculated FPS: {calc_fps:.2f}") | |
| print(f"Transcript requested: {req.get_transcript}") | |
| print("-------------------") | |
| # Extract X frames | |
| subprocess.run(["ffmpeg", "-y", | |
| "-loglevel", "error", | |
| "-i", v_p, "-vf", f"fps={calc_fps}", | |
| "-vframes", str(req.num_frames), | |
| "-q:v", "5", f"{tmp}/f_%03d.jpg"]) | |
| # 🚨 CONDITIONAL TRANSCRIPT | |
| txt = "" | |
| if req.get_transcript: | |
| subprocess.run(["ffmpeg", "-y", | |
| "-loglevel", "error", | |
| "-i", v_p, "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", a_p]) | |
| if os.path.exists(a_p): | |
| result = whisper_model.transcribe(a_p) | |
| lines = [f"[{s['start']:.2f}] {s['text'].strip()}" for s in result["segments"]] | |
| txt = "\n".join(lines) | |
| # Gather frames | |
| f_names = sorted([f"{tmp}/{f}" for f in os.listdir(tmp) if f.startswith("f_")]) | |
| imgs = [to_b64(f) for f in f_names] | |
| print("-------------------") | |
| print(f"Successfully extracted {len(imgs)} images.") | |
| print("-------------------") | |
| return {"success": True, "transcript": txt, "frames": imgs, "thumbnail": imgs[0] if imgs else None} | |
| except Exception as e: return {"success": False, "error": str(e)} | |
| finally: shutil.rmtree(tmp, ignore_errors=True) |