# Dynamic Video Subtitle & AI Voiceover Backend Architecture This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg. ## 1. System Architecture & Workflow Diagram Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue. **Workflow:** 1. **Client Upload:** User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase. 2. **Task Creation:** FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis. 3. **Queue / Message Broker:** Redis acts as the broker holding pending video generation jobs. 4. **Celery Worker (The Core Engine):** - Picks up the task. - Downloads the raw video/audio from Supabase. - Runs **faster-whisper** to extract word-level timestamps. - Runs **VieNeu-TTS** to generate the voiceover from the script. - Generates a dynamic `.ass` file mapping timestamps to bouncing subtitles. - Uses **ffmpeg-python** to merge the new audio, burn in the `.ass` subtitles, and output the final video. - Uploads the resulting `.mp4` to Supabase. - Updates the Supabase DB row to `completed`. 5. **Client Notification:** The frontend polls FastAPI (or uses websockets) for the job status and plays the final video. ```mermaid sequenceDiagram participant User participant FastAPI participant Supabase participant Redis_Queue participant Celery_Worker User->>FastAPI: POST /generate (video, script, voice_ref) FastAPI->>Supabase: Save raw assets to Storage FastAPI->>Supabase: Create job record (status: 'pending') FastAPI->>Redis_Queue: Enqueue 'render_video' task FastAPI-->>User: Return Job ID (Async Response) Celery_Worker->>Redis_Queue: Dequeue task Celery_Worker->>Supabase: Download raw assets Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps) Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover) Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge) Celery_Worker->>Supabase: Upload final video.mp4 Celery_Worker->>Supabase: Update job status ('completed') User->>FastAPI: GET /status/{job_id} FastAPI->>Supabase: Check DB record FastAPI-->>User: returns final video URL ``` ## 2. FastAPI Setup & Supabase Integration We will define `.env` configurations and simple endpoints. ### `main.py` ```python import os from fastapi import FastAPI, UploadFile, File, Form from pydantic import BaseModel from supabase import create_client, Client from worker import render_video_task # Celery task app = FastAPI(title="Video AI processing API") # Setup Supabase SUPABASE_URL = os.getenv("SUPABASE_URL") SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) class RenderJobRequest(BaseModel): script_text: str voice_preset_id: str = "default" @app.post("/api/v1/jobs/submit") async def submit_job( script: str = Form(...), video: UploadFile = File(...), ref_audio: UploadFile = File(None) ): """ Receives frontend files, stores them, and dispatches a Celery task. """ # 1. Upload assets to Supabase Storage video_bytes = await video.read() video_path = f"raw_videos/{video.filename}" supabase.storage.from_("content").upload(path=video_path, file=video_bytes) ref_audio_path = None if ref_audio: ref_audio_bytes = await ref_audio.read() ref_audio_path = f"references/{ref_audio.filename}" supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes) # 2. Create DB record for the job track db_resp = supabase.table("video_jobs").insert({ "status": "pending", "script": script, "raw_video_path": video_path }).execute() job_id = db_resp.data[0]["id"] # 3. Dispatch to Celery queue render_video_task.delay(job_id, video_path, script, ref_audio_path) return {"job_id": job_id, "status": "processing_queued"} @app.get("/api/v1/jobs/{job_id}") async def get_job_status(job_id: str): response = supabase.table("video_jobs").select("*").eq("id", job_id).execute() if not response.data: return {"error": "Job not found"} return response.data[0] ``` ## 3. Task Management: Celery + Redis vs BackgroundTasks **Senior Advice:** Never use FastAPI's native `BackgroundTasks` for Heavy AI (Whisper/VieNeu) or Rendering computations. `BackgroundTasks` run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users. **The Solution:** Use **Celery** with **Redis** as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting. ### `worker.py` (Celery Configuration) ```python from celery import Celery import os import tempfile from services.ai_pipeline import process_video_pipeline # See core logic below # Initialize Celery pointing to Redis celery_app = Celery( "video_tasks", broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"), backend=os.getenv("REDIS_URL", "redis://localhost:6379/0") ) @celery_app.task(bind=True, max_retries=3) def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str): from main import supabase # Import inside task to prevent circular deps try: # Update DB status supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute() # Download files to local temp storage with tempfile.TemporaryDirectory() as tmpdir: local_video = os.path.join(tmpdir, "input.mp4") with open(local_video, "wb") as f: f.write(supabase.storage.from_("content").download(video_path)) local_ref = None if ref_audio_path: local_ref = os.path.join(tmpdir, "ref.wav") with open(local_ref, "wb") as f: f.write(supabase.storage.from_("content").download(ref_audio_path)) # RUN CORE ML & FFMPEG LOGIC output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref) # Upload Result result_path = f"rendered/{job_id}_final.mp4" with open(output_mp4, "rb") as f: supabase.storage.from_("content").upload(path=result_path, file=f.read()) # Finish supabase.table("video_jobs").update({ "status": "completed", "result_url": supabase.storage.from_("content").get_public_url(result_path) }).eq("id", job_id).execute() except Exception as e: supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute() raise e ``` *(To run this, you execute `celery -A worker celery_app worker --loglevel=info` in your terminal).* ## 4. AI & Video Processing Logic (The Core) This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg. ### `services/ai_pipeline.py` ```python import os import ffmpeg from faster_whisper import WhisperModel from vieneu import Vieneu # Initialize Models at module level so Celery workers only load it once # NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard) tts = Vieneu() # device = "cuda" if torch.cuda.is_available() else "cpu" whisper_model = WhisperModel("base", device="cpu", compute_type="int8") def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str: """ Main orchestration function combining TTS, STT, and Video Rendering. """ tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav") # 1. GENERATE VIENEU-TTS VOICEOVER if ref_audio: # Zero-shot clone my_voice = tts.encode_reference(ref_audio) audio_array = tts.infer(text=script, voice=my_voice) else: # Default voice audio_array = tts.infer(text=script) tts.save(audio_array, tts_audio_path) # 2. FASTER-WHISPER TIMESTAMP EXTRACTION # (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps) segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi") words_data = [] for segment in segments: for word in segment.words: words_data.append({ "text": word.word.strip(), "start": word.start, "end": word.end }) # 3. GENERATE DYNAMIC .ASS SUBTITLE ass_path = os.path.join(tmpdir, "dynamic_subs.ass") generate_ass_file(words_data, ass_path) # 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs) output_video = os.path.join(tmpdir, "final_output.mp4") # Video channel (without original audio) in_video = ffmpeg.input(video_file).video # New Voiceover channel in_audio = ffmpeg.input(tts_audio_path) # Burn subtitle filter. # Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times. video_with_subs = in_video.filter('ass', ass_path) ( ffmpeg .output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k") .overwrite_output() .run() ) return output_video def generate_ass_file(words_data: list, dest_path: str): """ Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file with modern word-bounce (karaoke) color changes. """ header = """[Script Info] ScriptType: v4.00+ Collisions: Normal PlayResX: 1920 PlayResY: 1080 [V4+ Styles] Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1 [Events] Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text """ def format_time(seconds: float) -> str: # Format: H:MM:SS.cs h = int(seconds // 3600) m = int((seconds % 3600) // 60) s = seconds % 60 return f"{h}:{m:02d}:{s:05.2f}" lines = [header] # Chunk words into 5-word phrases chunk_size = 5 for i in range(0, len(words_data), chunk_size): chunk = words_data[i:i+chunk_size] time_start = chunk[0]['start'] time_end = chunk[-1]['end'] # We will create ONE event line for this whole phrase length, duplicating the line # across the active word boundary to change its color cleanly (Tiktok style popping). for active_idx, target_word in enumerate(chunk): w_start = target_word['start'] w_end = target_word['end'] line_text = "" for j, w in enumerate(chunk): if j == active_idx: # Highlight active word with scaling and Yellow Color line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} " else: line_text += f"{w['text']} " line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n" lines.append(line_str) with open(dest_path, "w", encoding="utf-8") as f: f.writelines(lines) ``` ## Setup Next Steps: 1. Initialize the Python project and environments: `uv init` 2. Install dependencies: `uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic` 3. Install external OS dependencies: `ffmpeg` and `redis-server`. Ensure your machine has CUDA drivers installed if testing GPU rendering. 4. Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace!