UI-VieNeu / implementation_plan.md
HuuDatLego's picture
Upload folder using huggingface_hub
6e413b2 verified

Dynamic Video Subtitle & AI Voiceover Backend Architecture

This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg.

1. System Architecture & Workflow Diagram

Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue.

Workflow:

  1. Client Upload: User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase.
  2. Task Creation: FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis.
  3. Queue / Message Broker: Redis acts as the broker holding pending video generation jobs.
  4. Celery Worker (The Core Engine):
    • Picks up the task.
    • Downloads the raw video/audio from Supabase.
    • Runs faster-whisper to extract word-level timestamps.
    • Runs VieNeu-TTS to generate the voiceover from the script.
    • Generates a dynamic .ass file mapping timestamps to bouncing subtitles.
    • Uses ffmpeg-python to merge the new audio, burn in the .ass subtitles, and output the final video.
    • Uploads the resulting .mp4 to Supabase.
    • Updates the Supabase DB row to completed.
  5. Client Notification: The frontend polls FastAPI (or uses websockets) for the job status and plays the final video.
sequenceDiagram
    participant User
    participant FastAPI
    participant Supabase
    participant Redis_Queue
    participant Celery_Worker

    User->>FastAPI: POST /generate (video, script, voice_ref)
    FastAPI->>Supabase: Save raw assets to Storage 
    FastAPI->>Supabase: Create job record (status: 'pending')
    FastAPI->>Redis_Queue: Enqueue 'render_video' task
    FastAPI-->>User: Return Job ID (Async Response)
    
    Celery_Worker->>Redis_Queue: Dequeue task
    Celery_Worker->>Supabase: Download raw assets
    Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps)
    Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover)
    Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle
    Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge)
    Celery_Worker->>Supabase: Upload final video.mp4
    Celery_Worker->>Supabase: Update job status ('completed')
    
    User->>FastAPI: GET /status/{job_id}
    FastAPI->>Supabase: Check DB record
    FastAPI-->>User: returns final video URL

2. FastAPI Setup & Supabase Integration

We will define .env configurations and simple endpoints.

main.py

import os
from fastapi import FastAPI, UploadFile, File, Form
from pydantic import BaseModel
from supabase import create_client, Client
from worker import render_video_task  # Celery task

app = FastAPI(title="Video AI processing API")

# Setup Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

class RenderJobRequest(BaseModel):
    script_text: str
    voice_preset_id: str = "default"

@app.post("/api/v1/jobs/submit")
async def submit_job(
    script: str = Form(...),
    video: UploadFile = File(...),
    ref_audio: UploadFile = File(None)
):
    """
    Receives frontend files, stores them, and dispatches a Celery task.
    """
    # 1. Upload assets to Supabase Storage
    video_bytes = await video.read()
    video_path = f"raw_videos/{video.filename}"
    supabase.storage.from_("content").upload(path=video_path, file=video_bytes)
    
    ref_audio_path = None
    if ref_audio:
        ref_audio_bytes = await ref_audio.read()
        ref_audio_path = f"references/{ref_audio.filename}"
        supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes)

    # 2. Create DB record for the job track
    db_resp = supabase.table("video_jobs").insert({
        "status": "pending",
        "script": script,
        "raw_video_path": video_path
    }).execute()
    job_id = db_resp.data[0]["id"]
    
    # 3. Dispatch to Celery queue
    render_video_task.delay(job_id, video_path, script, ref_audio_path)

    return {"job_id": job_id, "status": "processing_queued"}

@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
    response = supabase.table("video_jobs").select("*").eq("id", job_id).execute()
    if not response.data:
        return {"error": "Job not found"}
    return response.data[0]

3. Task Management: Celery + Redis vs BackgroundTasks

Senior Advice: Never use FastAPI's native BackgroundTasks for Heavy AI (Whisper/VieNeu) or Rendering computations. BackgroundTasks run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users.

The Solution: Use Celery with Redis as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting.

worker.py (Celery Configuration)

from celery import Celery
import os
import tempfile
from services.ai_pipeline import process_video_pipeline # See core logic below

# Initialize Celery pointing to Redis
celery_app = Celery(
    "video_tasks",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)

@celery_app.task(bind=True, max_retries=3)
def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str):
    from main import supabase # Import inside task to prevent circular deps
    
    try:
        # Update DB status
        supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
        
        # Download files to local temp storage
        with tempfile.TemporaryDirectory() as tmpdir:
            local_video = os.path.join(tmpdir, "input.mp4")
            with open(local_video, "wb") as f:
                f.write(supabase.storage.from_("content").download(video_path))
                
            local_ref = None
            if ref_audio_path:
                local_ref = os.path.join(tmpdir, "ref.wav")
                with open(local_ref, "wb") as f:
                    f.write(supabase.storage.from_("content").download(ref_audio_path))
            
            # RUN CORE ML & FFMPEG LOGIC
            output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref)
            
            # Upload Result
            result_path = f"rendered/{job_id}_final.mp4"
            with open(output_mp4, "rb") as f:
                supabase.storage.from_("content").upload(path=result_path, file=f.read())
                
            # Finish
            supabase.table("video_jobs").update({
                "status": "completed", 
                "result_url": supabase.storage.from_("content").get_public_url(result_path)
            }).eq("id", job_id).execute()

    except Exception as e:
        supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute()
        raise e

(To run this, you execute celery -A worker celery_app worker --loglevel=info in your terminal).

4. AI & Video Processing Logic (The Core)

This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg.

services/ai_pipeline.py

import os
import ffmpeg
from faster_whisper import WhisperModel
from vieneu import Vieneu

# Initialize Models at module level so Celery workers only load it once
# NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard)
tts = Vieneu() 
# device = "cuda" if torch.cuda.is_available() else "cpu"
whisper_model = WhisperModel("base", device="cpu", compute_type="int8")

def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str:
    """
    Main orchestration function combining TTS, STT, and Video Rendering.
    """
    tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav")
    
    # 1. GENERATE VIENEU-TTS VOICEOVER
    if ref_audio:
        # Zero-shot clone
        my_voice = tts.encode_reference(ref_audio)
        audio_array = tts.infer(text=script, voice=my_voice)
    else:
        # Default voice
        audio_array = tts.infer(text=script)
        
    tts.save(audio_array, tts_audio_path)

    # 2. FASTER-WHISPER TIMESTAMP EXTRACTION 
    # (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps)
    segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi")
    
    words_data = []
    for segment in segments:
        for word in segment.words:
            words_data.append({
                "text": word.word.strip(),
                "start": word.start,
                "end": word.end
            })

    # 3. GENERATE DYNAMIC .ASS SUBTITLE
    ass_path = os.path.join(tmpdir, "dynamic_subs.ass")
    generate_ass_file(words_data, ass_path)
    
    # 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs)
    output_video = os.path.join(tmpdir, "final_output.mp4")
    
    # Video channel (without original audio)
    in_video = ffmpeg.input(video_file).video
    # New Voiceover channel
    in_audio = ffmpeg.input(tts_audio_path)
    
    # Burn subtitle filter. 
    # Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times.
    video_with_subs = in_video.filter('ass', ass_path)
    
    (
        ffmpeg
        .output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k")
        .overwrite_output()
        .run()
    )
    
    return output_video

def generate_ass_file(words_data: list, dest_path: str):
    """
    Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file
    with modern word-bounce (karaoke) color changes.
    """
    header = """[Script Info]
ScriptType: v4.00+
Collisions: Normal
PlayResX: 1920
PlayResY: 1080

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1

[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
    
    def format_time(seconds: float) -> str:
        # Format: H:MM:SS.cs
        h = int(seconds // 3600)
        m = int((seconds % 3600) // 60)
        s = seconds % 60
        return f"{h}:{m:02d}:{s:05.2f}"

    lines = [header]
    
    # Chunk words into 5-word phrases
    chunk_size = 5
    for i in range(0, len(words_data), chunk_size):
        chunk = words_data[i:i+chunk_size]
        time_start = chunk[0]['start']
        time_end = chunk[-1]['end']
        
        # We will create ONE event line for this whole phrase length, duplicating the line
        # across the active word boundary to change its color cleanly (Tiktok style popping).
        
        for active_idx, target_word in enumerate(chunk):
            w_start = target_word['start']
            w_end = target_word['end']
            
            line_text = ""
            for j, w in enumerate(chunk):
                if j == active_idx:
                    # Highlight active word with scaling and Yellow Color
                    line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} "
                else:
                    line_text += f"{w['text']} "
            
            line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n"
            lines.append(line_str)

    with open(dest_path, "w", encoding="utf-8") as f:
        f.writelines(lines)

Setup Next Steps:

  1. Initialize the Python project and environments: uv init
  2. Install dependencies: uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic
  3. Install external OS dependencies: ffmpeg and redis-server. Ensure your machine has CUDA drivers installed if testing GPU rendering.
  4. Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace!