File size: 12,680 Bytes
6e413b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
# Dynamic Video Subtitle & AI Voiceover Backend Architecture

This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg.

## 1. System Architecture & Workflow Diagram

Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue. 

**Workflow:**
1. **Client Upload:** User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase.
2. **Task Creation:** FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis.
3. **Queue / Message Broker:** Redis acts as the broker holding pending video generation jobs.
4. **Celery Worker (The Core Engine):**
   - Picks up the task.
   - Downloads the raw video/audio from Supabase.
   - Runs **faster-whisper** to extract word-level timestamps.
   - Runs **VieNeu-TTS** to generate the voiceover from the script.
   - Generates a dynamic `.ass` file mapping timestamps to bouncing subtitles.
   - Uses **ffmpeg-python** to merge the new audio, burn in the `.ass` subtitles, and output the final video.
   - Uploads the resulting `.mp4` to Supabase.
   - Updates the Supabase DB row to `completed`.
5. **Client Notification:** The frontend polls FastAPI (or uses websockets) for the job status and plays the final video.

```mermaid
sequenceDiagram
    participant User
    participant FastAPI
    participant Supabase
    participant Redis_Queue
    participant Celery_Worker

    User->>FastAPI: POST /generate (video, script, voice_ref)
    FastAPI->>Supabase: Save raw assets to Storage 
    FastAPI->>Supabase: Create job record (status: 'pending')
    FastAPI->>Redis_Queue: Enqueue 'render_video' task
    FastAPI-->>User: Return Job ID (Async Response)
    
    Celery_Worker->>Redis_Queue: Dequeue task
    Celery_Worker->>Supabase: Download raw assets
    Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps)
    Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover)
    Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle
    Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge)
    Celery_Worker->>Supabase: Upload final video.mp4
    Celery_Worker->>Supabase: Update job status ('completed')
    
    User->>FastAPI: GET /status/{job_id}
    FastAPI->>Supabase: Check DB record
    FastAPI-->>User: returns final video URL
```

## 2. FastAPI Setup & Supabase Integration

We will define `.env` configurations and simple endpoints.

### `main.py`
```python
import os
from fastapi import FastAPI, UploadFile, File, Form
from pydantic import BaseModel
from supabase import create_client, Client
from worker import render_video_task  # Celery task

app = FastAPI(title="Video AI processing API")

# Setup Supabase
SUPABASE_URL = os.getenv("SUPABASE_URL")
SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

class RenderJobRequest(BaseModel):
    script_text: str
    voice_preset_id: str = "default"

@app.post("/api/v1/jobs/submit")
async def submit_job(
    script: str = Form(...),
    video: UploadFile = File(...),
    ref_audio: UploadFile = File(None)
):
    """
    Receives frontend files, stores them, and dispatches a Celery task.
    """
    # 1. Upload assets to Supabase Storage
    video_bytes = await video.read()
    video_path = f"raw_videos/{video.filename}"
    supabase.storage.from_("content").upload(path=video_path, file=video_bytes)
    
    ref_audio_path = None
    if ref_audio:
        ref_audio_bytes = await ref_audio.read()
        ref_audio_path = f"references/{ref_audio.filename}"
        supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes)

    # 2. Create DB record for the job track
    db_resp = supabase.table("video_jobs").insert({
        "status": "pending",
        "script": script,
        "raw_video_path": video_path
    }).execute()
    job_id = db_resp.data[0]["id"]
    
    # 3. Dispatch to Celery queue
    render_video_task.delay(job_id, video_path, script, ref_audio_path)

    return {"job_id": job_id, "status": "processing_queued"}

@app.get("/api/v1/jobs/{job_id}")
async def get_job_status(job_id: str):
    response = supabase.table("video_jobs").select("*").eq("id", job_id).execute()
    if not response.data:
        return {"error": "Job not found"}
    return response.data[0]
```

## 3. Task Management: Celery + Redis vs BackgroundTasks

**Senior Advice:** Never use FastAPI's native `BackgroundTasks` for Heavy AI (Whisper/VieNeu) or Rendering computations. `BackgroundTasks` run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users.

**The Solution:** Use **Celery** with **Redis** as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting.

### `worker.py` (Celery Configuration)
```python
from celery import Celery
import os
import tempfile
from services.ai_pipeline import process_video_pipeline # See core logic below

# Initialize Celery pointing to Redis
celery_app = Celery(
    "video_tasks",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)

@celery_app.task(bind=True, max_retries=3)
def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str):
    from main import supabase # Import inside task to prevent circular deps
    
    try:
        # Update DB status
        supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
        
        # Download files to local temp storage
        with tempfile.TemporaryDirectory() as tmpdir:
            local_video = os.path.join(tmpdir, "input.mp4")
            with open(local_video, "wb") as f:
                f.write(supabase.storage.from_("content").download(video_path))
                
            local_ref = None
            if ref_audio_path:
                local_ref = os.path.join(tmpdir, "ref.wav")
                with open(local_ref, "wb") as f:
                    f.write(supabase.storage.from_("content").download(ref_audio_path))
            
            # RUN CORE ML & FFMPEG LOGIC
            output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref)
            
            # Upload Result
            result_path = f"rendered/{job_id}_final.mp4"
            with open(output_mp4, "rb") as f:
                supabase.storage.from_("content").upload(path=result_path, file=f.read())
                
            # Finish
            supabase.table("video_jobs").update({
                "status": "completed", 
                "result_url": supabase.storage.from_("content").get_public_url(result_path)
            }).eq("id", job_id).execute()

    except Exception as e:
        supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute()
        raise e
```
*(To run this, you execute `celery -A worker celery_app worker --loglevel=info` in your terminal).*

## 4. AI & Video Processing Logic (The Core)

This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg.

### `services/ai_pipeline.py`
```python
import os
import ffmpeg
from faster_whisper import WhisperModel
from vieneu import Vieneu

# Initialize Models at module level so Celery workers only load it once
# NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard)
tts = Vieneu() 
# device = "cuda" if torch.cuda.is_available() else "cpu"
whisper_model = WhisperModel("base", device="cpu", compute_type="int8")

def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str:
    """
    Main orchestration function combining TTS, STT, and Video Rendering.
    """
    tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav")
    
    # 1. GENERATE VIENEU-TTS VOICEOVER
    if ref_audio:
        # Zero-shot clone
        my_voice = tts.encode_reference(ref_audio)
        audio_array = tts.infer(text=script, voice=my_voice)
    else:
        # Default voice
        audio_array = tts.infer(text=script)
        
    tts.save(audio_array, tts_audio_path)

    # 2. FASTER-WHISPER TIMESTAMP EXTRACTION 
    # (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps)
    segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi")
    
    words_data = []
    for segment in segments:
        for word in segment.words:
            words_data.append({
                "text": word.word.strip(),
                "start": word.start,
                "end": word.end
            })

    # 3. GENERATE DYNAMIC .ASS SUBTITLE
    ass_path = os.path.join(tmpdir, "dynamic_subs.ass")
    generate_ass_file(words_data, ass_path)
    
    # 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs)
    output_video = os.path.join(tmpdir, "final_output.mp4")
    
    # Video channel (without original audio)
    in_video = ffmpeg.input(video_file).video
    # New Voiceover channel
    in_audio = ffmpeg.input(tts_audio_path)
    
    # Burn subtitle filter. 
    # Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times.
    video_with_subs = in_video.filter('ass', ass_path)
    
    (
        ffmpeg
        .output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k")
        .overwrite_output()
        .run()
    )
    
    return output_video

def generate_ass_file(words_data: list, dest_path: str):
    """
    Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file
    with modern word-bounce (karaoke) color changes.
    """
    header = """[Script Info]
ScriptType: v4.00+
Collisions: Normal
PlayResX: 1920
PlayResY: 1080

[V4+ Styles]
Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding
Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1

[Events]
Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text
"""
    
    def format_time(seconds: float) -> str:
        # Format: H:MM:SS.cs
        h = int(seconds // 3600)
        m = int((seconds % 3600) // 60)
        s = seconds % 60
        return f"{h}:{m:02d}:{s:05.2f}"

    lines = [header]
    
    # Chunk words into 5-word phrases
    chunk_size = 5
    for i in range(0, len(words_data), chunk_size):
        chunk = words_data[i:i+chunk_size]
        time_start = chunk[0]['start']
        time_end = chunk[-1]['end']
        
        # We will create ONE event line for this whole phrase length, duplicating the line
        # across the active word boundary to change its color cleanly (Tiktok style popping).
        
        for active_idx, target_word in enumerate(chunk):
            w_start = target_word['start']
            w_end = target_word['end']
            
            line_text = ""
            for j, w in enumerate(chunk):
                if j == active_idx:
                    # Highlight active word with scaling and Yellow Color
                    line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} "
                else:
                    line_text += f"{w['text']} "
            
            line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n"
            lines.append(line_str)

    with open(dest_path, "w", encoding="utf-8") as f:
        f.writelines(lines)
```

## Setup Next Steps:
1. Initialize the Python project and environments: `uv init`
2. Install dependencies: `uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic`
3. Install external OS dependencies: `ffmpeg` and `redis-server`. Ensure your machine has CUDA drivers installed if testing GPU rendering.
4. Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace!