UI-VieNeu / implementation_plan.md
HuuDatLego's picture
Upload folder using huggingface_hub
6e413b2 verified
# Dynamic Video Subtitle & AI Voiceover Backend Architecture
This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg.
## 1. System Architecture & Workflow Diagram
Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue.
**Workflow:**
1. **Client Upload:** User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase.
2. **Task Creation:** FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis.
3. **Queue / Message Broker:** Redis acts as the broker holding pending video generation jobs.
4. **Celery Worker (The Core Engine):**
- Picks up the task.
- Downloads the raw video/audio from Supabase.
- Runs **faster-whisper** to extract word-level timestamps.
- Runs **VieNeu-TTS** to generate the voiceover from the script.
- Generates a dynamic `.ass` file mapping timestamps to bouncing subtitles.
- Uses **ffmpeg-python** to merge the new audio, burn in the `.ass` subtitles, and output the final video.
- Uploads the resulting `.mp4` to Supabase.
- Updates the Supabase DB row to `completed`.
5. **Client Notification:** The frontend polls FastAPI (or uses websockets) for the job status and plays the final video.
```mermaid
sequenceDiagram
participant User
participant FastAPI
participant Supabase
participant Redis_Queue
participant Celery_Worker
User->>FastAPI: POST /generate (video, script, voice_ref)
FastAPI->>Supabase: Save raw assets to Storage
FastAPI->>Supabase: Create job record (status: 'pending')
FastAPI->>Redis_Queue: Enqueue 'render_video' task
FastAPI-->>User: Return Job ID (Async Response)
Celery_Worker->>Redis_Queue: Dequeue task
Celery_Worker->>Supabase: Download raw assets
Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps)
Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover)
Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle
Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge)
Celery_Worker->>Supabase: Upload final video.mp4
Celery_Worker->>Supabase: Update job status ('completed')
User->>FastAPI: GET /status/{job_id}
FastAPI->>Supabase: Check DB record
FastAPI-->>User: returns final video URL
```
## 2. FastAPI Setup & Supabase Integration
We will define `.env` configurations and simple endpoints.
### `main.py`
```python
import os
from fastapi import FastAPI, UploadFile, File, Form
from pydantic import BaseModel
from supabase import create_client, Client
from worker import render_video_task # Celery task
app = FastAPI(title="Video AI processing API")
# Setup Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
class RenderJobRequest(BaseModel):
script_text: str
voice_preset_id: str = "default"
@app.post("/api/v1/jobs/submit")
async def submit_job(
script: str = Form(...),
video: UploadFile = File(...),
ref_audio: UploadFile = File(None)
):
"""
Receives frontend files, stores them, and dispatches a Celery task.
"""
# 1. Upload assets to Supabase Storage
video_bytes = await video.read()
video_path = f"raw_videos/{video.filename}"
supabase.storage.from_("content").upload(path=video_path, file=video_bytes)
ref_audio_path = None
if ref_audio:
ref_audio_bytes = await ref_audio.read()
ref_audio_path = f"references/{ref_audio.filename}"
supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes)
# 2. Create DB record for the job track
db_resp = supabase.table("video_jobs").insert({
"status": "pending",
"script": script,
"raw_video_path": video_path
}).execute()
job_id = db_resp.data[0]["id"]
# 3. Dispatch to Celery queue
render_video_task.delay(job_id, video_path, script, ref_audio_path)
return {"job_id": job_id, "status": "processing_queued"}
@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
response = supabase.table("video_jobs").select("*").eq("id", job_id).execute()
if not response.data:
return {"error": "Job not found"}
return response.data[0]
```
## 3. Task Management: Celery + Redis vs BackgroundTasks
**Senior Advice:** Never use FastAPI's native `BackgroundTasks` for Heavy AI (Whisper/VieNeu) or Rendering computations. `BackgroundTasks` run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users.
**The Solution:** Use **Celery** with **Redis** as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting.
### `worker.py` (Celery Configuration)
```python
from celery import Celery
import os
import tempfile
from services.ai_pipeline import process_video_pipeline # See core logic below
# Initialize Celery pointing to Redis
celery_app = Celery(
"video_tasks",
broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
backend=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)
@celery_app.task(bind=True, max_retries=3)
def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str):
from main import supabase # Import inside task to prevent circular deps
try:
# Update DB status
supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
# Download files to local temp storage
with tempfile.TemporaryDirectory() as tmpdir:
local_video = os.path.join(tmpdir, "input.mp4")
with open(local_video, "wb") as f:
f.write(supabase.storage.from_("content").download(video_path))
local_ref = None
if ref_audio_path:
local_ref = os.path.join(tmpdir, "ref.wav")
with open(local_ref, "wb") as f:
f.write(supabase.storage.from_("content").download(ref_audio_path))
# RUN CORE ML & FFMPEG LOGIC
output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref)
# Upload Result
result_path = f"rendered/{job_id}_final.mp4"
with open(output_mp4, "rb") as f:
supabase.storage.from_("content").upload(path=result_path, file=f.read())
# Finish
supabase.table("video_jobs").update({
"status": "completed",
"result_url": supabase.storage.from_("content").get_public_url(result_path)
}).eq("id", job_id).execute()
except Exception as e:
supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute()
raise e
```
*(To run this, you execute `celery -A worker celery_app worker --loglevel=info` in your terminal).*
## 4. AI & Video Processing Logic (The Core)
This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg.
### `services/ai_pipeline.py`
```python
import os
import ffmpeg
from faster_whisper import WhisperModel
from vieneu import Vieneu
# Initialize Models at module level so Celery workers only load it once
# NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard)
tts = Vieneu()
# device = "cuda" if torch.cuda.is_available() else "cpu"
whisper_model = WhisperModel("base", device="cpu", compute_type="int8")
def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str:
"""
Main orchestration function combining TTS, STT, and Video Rendering.
"""
tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav")
# 1. GENERATE VIENEU-TTS VOICEOVER
if ref_audio:
# Zero-shot clone
my_voice = tts.encode_reference(ref_audio)
audio_array = tts.infer(text=script, voice=my_voice)
else:
# Default voice
audio_array = tts.infer(text=script)
tts.save(audio_array, tts_audio_path)
# 2. FASTER-WHISPER TIMESTAMP EXTRACTION
# (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps)
segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi")
words_data = []
for segment in segments:
for word in segment.words:
words_data.append({
"text": word.word.strip(),
"start": word.start,
"end": word.end
})
# 3. GENERATE DYNAMIC .ASS SUBTITLE
ass_path = os.path.join(tmpdir, "dynamic_subs.ass")
generate_ass_file(words_data, ass_path)
# 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs)
output_video = os.path.join(tmpdir, "final_output.mp4")
# Video channel (without original audio)
in_video = ffmpeg.input(video_file).video
# New Voiceover channel
in_audio = ffmpeg.input(tts_audio_path)
# Burn subtitle filter.
# Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times.
video_with_subs = in_video.filter('ass', ass_path)
(
ffmpeg
.output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k")
.overwrite_output()
.run()
)
return output_video
def generate_ass_file(words_data: list, dest_path: str):
"""
Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file
with modern word-bounce (karaoke) color changes.
"""
header = """[Script Info]
ScriptType: v4.00+
Collisions: Normal
PlayResX: 1920
PlayResY: 1080
[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1
[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
def format_time(seconds: float) -> str:
# Format: H:MM:SS.cs
h = int(seconds // 3600)
m = int((seconds % 3600) // 60)
s = seconds % 60
return f"{h}:{m:02d}:{s:05.2f}"
lines = [header]
# Chunk words into 5-word phrases
chunk_size = 5
for i in range(0, len(words_data), chunk_size):
chunk = words_data[i:i+chunk_size]
time_start = chunk[0]['start']
time_end = chunk[-1]['end']
# We will create ONE event line for this whole phrase length, duplicating the line
# across the active word boundary to change its color cleanly (Tiktok style popping).
for active_idx, target_word in enumerate(chunk):
w_start = target_word['start']
w_end = target_word['end']
line_text = ""
for j, w in enumerate(chunk):
if j == active_idx:
# Highlight active word with scaling and Yellow Color
line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} "
else:
line_text += f"{w['text']} "
line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n"
lines.append(line_str)
with open(dest_path, "w", encoding="utf-8") as f:
f.writelines(lines)
```
## Setup Next Steps:
1. Initialize the Python project and environments: `uv init`
2. Install dependencies: `uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic`
3. Install external OS dependencies: `ffmpeg` and `redis-server`. Ensure your machine has CUDA drivers installed if testing GPU rendering.
4. Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace!