Spaces:
Running
Running
File size: 2,582 Bytes
057ca82 df0bd72 decf4fe df0bd72 057ca82 decf4fe 057ca82 decf4fe df0bd72 057ca82 df0bd72 decf4fe df0bd72 057ca82 df0bd72 057ca82 df0bd72 057ca82 e3cf3b1 decf4fe 057ca82 decf4fe 057ca82 decf4fe df0bd72 decf4fe df0bd72 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import subprocess
import base64
import os
import uuid
import shutil
import whisper
app = FastAPI()
# Load Whisper Tiny once on startup
print("Loading Whisper...")
whisper_model = whisper.load_model("tiny")
class VideoJsonRequest(BaseModel):
video_base64: str
def file_to_base64(filepath):
if not os.path.exists(filepath): return None
with open(filepath, "rb") as f:
return base64.b64encode(f.read()).decode('utf-8')
@app.post("/process-video")
async def process_video(req: VideoJsonRequest):
job_id = str(uuid.uuid4())
work_dir = f"/tmp/viralcat_{job_id}"
os.makedirs(work_dir, exist_ok=True)
video_path = os.path.join(work_dir, "video.mp4")
audio_path = os.path.join(work_dir, "audio.wav")
try:
# 1. Decode Base64 string into actual MP4 file
with open(video_path, "wb") as f:
f.write(base64.b64decode(req.video_base64))
# 2. Get Duration
probe = subprocess.run([
"ffprobe", "-v", "error", "-show_entries",
"format=duration", "-of", "default=noprint_wrappers=1:nokey=1",
video_path
], capture_output=True, text=True, check=True)
duration = float(probe.stdout.strip() or 0)
# 3. Extract exactly 15 frames
# fps = 15 / max(duration, 1)
fps = 1 / max(duration, 1)
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-vf", f"fps={fps}",
"-vframes", "15",
"-q:v", "4", f"{work_dir}/frame_%03d.jpg"
], check=True, capture_output=True)
# 4. Extract & Transcribe Audio
subprocess.run([
"ffmpeg", "-y", "-i", video_path,
"-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1", audio_path
], check=True, capture_output=True)
transcript = ""
if os.path.exists(audio_path):
result = whisper_model.transcribe(audio_path)
transcript = result["text"].strip()
# 5. Gather frames
frame_files = sorted([f for f in os.listdir(work_dir) if f.startswith("frame_")])
frames_b64 = [file_to_base64(os.path.join(work_dir, f)) for f in frame_files]
return {
"success": True,
"transcript": transcript,
"frames": frames_b64
}
except Exception as e:
return {"success": False, "error": str(e)}
finally:
if os.path.exists(work_dir):
shutil.rmtree(work_dir, ignore_errors=True) |