Spaces:
Build error
Dynamic Video Subtitle & AI Voiceover Backend Architecture
This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg.
1. System Architecture & Workflow Diagram
Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue.
Workflow:
- Client Upload: User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase.
- Task Creation: FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis.
- Queue / Message Broker: Redis acts as the broker holding pending video generation jobs.
- Celery Worker (The Core Engine):
- Picks up the task.
- Downloads the raw video/audio from Supabase.
- Runs faster-whisper to extract word-level timestamps.
- Runs VieNeu-TTS to generate the voiceover from the script.
- Generates a dynamic
.assfile mapping timestamps to bouncing subtitles. - Uses ffmpeg-python to merge the new audio, burn in the
.asssubtitles, and output the final video. - Uploads the resulting
.mp4to Supabase. - Updates the Supabase DB row to
completed.
- Client Notification: The frontend polls FastAPI (or uses websockets) for the job status and plays the final video.
sequenceDiagram
participant User
participant FastAPI
participant Supabase
participant Redis_Queue
participant Celery_Worker
User->>FastAPI: POST /generate (video, script, voice_ref)
FastAPI->>Supabase: Save raw assets to Storage
FastAPI->>Supabase: Create job record (status: 'pending')
FastAPI->>Redis_Queue: Enqueue 'render_video' task
FastAPI-->>User: Return Job ID (Async Response)
Celery_Worker->>Redis_Queue: Dequeue task
Celery_Worker->>Supabase: Download raw assets
Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps)
Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover)
Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle
Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge)
Celery_Worker->>Supabase: Upload final video.mp4
Celery_Worker->>Supabase: Update job status ('completed')
User->>FastAPI: GET /status/{job_id}
FastAPI->>Supabase: Check DB record
FastAPI-->>User: returns final video URL
2. FastAPI Setup & Supabase Integration
We will define .env configurations and simple endpoints.
main.py
import os
from fastapi import FastAPI, UploadFile, File, Form
from pydantic import BaseModel
from supabase import create_client, Client
from worker import render_video_task # Celery task
app = FastAPI(title="Video AI processing API")
# Setup Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class RenderJobRequest(BaseModel):
script_text: str
voice_preset_id: str = "default"
@app.post("/api/v1/jobs/submit")
async def submit_job(
script: str = Form(...),
video: UploadFile = File(...),
ref_audio: UploadFile = File(None)
):
"""
Receives frontend files, stores them, and dispatches a Celery task.
"""
# 1. Upload assets to Supabase Storage
video_bytes = await video.read()
video_path = f"raw_videos/{video.filename}"
supabase.storage.from_("content").upload(path=video_path, file=video_bytes)
ref_audio_path = None
if ref_audio:
ref_audio_bytes = await ref_audio.read()
ref_audio_path = f"references/{ref_audio.filename}"
supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes)
# 2. Create DB record for the job track
db_resp = supabase.table("video_jobs").insert({
"status": "pending",
"script": script,
"raw_video_path": video_path
}).execute()
job_id = db_resp.data[0]["id"]
# 3. Dispatch to Celery queue
render_video_task.delay(job_id, video_path, script, ref_audio_path)
return {"job_id": job_id, "status": "processing_queued"}
@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
response = supabase.table("video_jobs").select("*").eq("id", job_id).execute()
if not response.data:
return {"error": "Job not found"}
return response.data[0]
3. Task Management: Celery + Redis vs BackgroundTasks
Senior Advice: Never use FastAPI's native BackgroundTasks for Heavy AI (Whisper/VieNeu) or Rendering computations. BackgroundTasks run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users.
The Solution: Use Celery with Redis as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting.
worker.py (Celery Configuration)
from celery import Celery
import os
import tempfile
from services.ai_pipeline import process_video_pipeline # See core logic below
# Initialize Celery pointing to Redis
celery_app = Celery(
"video_tasks",
broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
backend=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)
@celery_app.task(bind=True, max_retries=3)
def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str):
from main import supabase # Import inside task to prevent circular deps
try:
# Update DB status
supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
# Download files to local temp storage
with tempfile.TemporaryDirectory() as tmpdir:
local_video = os.path.join(tmpdir, "input.mp4")
with open(local_video, "wb") as f:
f.write(supabase.storage.from_("content").download(video_path))
local_ref = None
if ref_audio_path:
local_ref = os.path.join(tmpdir, "ref.wav")
with open(local_ref, "wb") as f:
f.write(supabase.storage.from_("content").download(ref_audio_path))
# RUN CORE ML & FFMPEG LOGIC
output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref)
# Upload Result
result_path = f"rendered/{job_id}_final.mp4"
with open(output_mp4, "rb") as f:
supabase.storage.from_("content").upload(path=result_path, file=f.read())
# Finish
supabase.table("video_jobs").update({
"status": "completed",
"result_url": supabase.storage.from_("content").get_public_url(result_path)
}).eq("id", job_id).execute()
except Exception as e:
supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute()
raise e
(To run this, you execute celery -A worker celery_app worker --loglevel=info in your terminal).
4. AI & Video Processing Logic (The Core)
This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg.
services/ai_pipeline.py
import os
import ffmpeg
from faster_whisper import WhisperModel
from vieneu import Vieneu
# Initialize Models at module level so Celery workers only load it once
# NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard)
tts = Vieneu()
# device = "cuda" if torch.cuda.is_available() else "cpu"
whisper_model = WhisperModel("base", device="cpu", compute_type="int8")
def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str:
"""
Main orchestration function combining TTS, STT, and Video Rendering.
"""
tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav")
# 1. GENERATE VIENEU-TTS VOICEOVER
if ref_audio:
# Zero-shot clone
my_voice = tts.encode_reference(ref_audio)
audio_array = tts.infer(text=script, voice=my_voice)
else:
# Default voice
audio_array = tts.infer(text=script)
tts.save(audio_array, tts_audio_path)
# 2. FASTER-WHISPER TIMESTAMP EXTRACTION
# (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps)
segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi")
words_data = []
for segment in segments:
for word in segment.words:
words_data.append({
"text": word.word.strip(),
"start": word.start,
"end": word.end
})
# 3. GENERATE DYNAMIC .ASS SUBTITLE
ass_path = os.path.join(tmpdir, "dynamic_subs.ass")
generate_ass_file(words_data, ass_path)
# 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs)
output_video = os.path.join(tmpdir, "final_output.mp4")
# Video channel (without original audio)
in_video = ffmpeg.input(video_file).video
# New Voiceover channel
in_audio = ffmpeg.input(tts_audio_path)
# Burn subtitle filter.
# Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times.
video_with_subs = in_video.filter('ass', ass_path)
(
ffmpeg
.output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k")
.overwrite_output()
.run()
)
return output_video
def generate_ass_file(words_data: list, dest_path: str):
"""
Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file
with modern word-bounce (karaoke) color changes.
"""
header = """[Script Info]
ScriptType: v4.00+
Collisions: Normal
PlayResX: 1920
PlayResY: 1080
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
def format_time(seconds: float) -> str:
# Format: H:MM:SS.cs
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h}:{m:02d}:{s:05.2f}"
lines = [header]
# Chunk words into 5-word phrases
chunk_size = 5
for i in range(0, len(words_data), chunk_size):
chunk = words_data[i:i+chunk_size]
time_start = chunk[0]['start']
time_end = chunk[-1]['end']
# We will create ONE event line for this whole phrase length, duplicating the line
# across the active word boundary to change its color cleanly (Tiktok style popping).
for active_idx, target_word in enumerate(chunk):
w_start = target_word['start']
w_end = target_word['end']
line_text = ""
for j, w in enumerate(chunk):
if j == active_idx:
# Highlight active word with scaling and Yellow Color
line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} "
else:
line_text += f"{w['text']} "
line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n"
lines.append(line_str)
with open(dest_path, "w", encoding="utf-8") as f:
f.writelines(lines)
Setup Next Steps:
- Initialize the Python project and environments:
uv init - Install dependencies:
uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic - Install external OS dependencies:
ffmpegandredis-server. Ensure your machine has CUDA drivers installed if testing GPU rendering. - Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace!