Spaces:
Build error
Build error
| # Dynamic Video Subtitle & AI Voiceover Backend Architecture | |
| This document provides the foundational architecture, system workflow, and code structure for a 100% Python-based video generation API backend using FastAPI, Supabase, Whisper, VieNeu, and FFmpeg. | |
| ## 1. System Architecture & Workflow Diagram | |
| Video processing and AI inference are heavily CPU & GPU bound. This means we cannot block the FastAPI server threads, and we must offload these operations to a task queue. | |
| **Workflow:** | |
| 1. **Client Upload:** User uploads a raw video and script directly to Supabase Storage (presigned URL) or via FastAPI, which saves it to Supabase. | |
| 2. **Task Creation:** FastAPI receives the processing request (with the Supabase file path), saves a job record in the Supabase DB, and pushes a task to Redis. | |
| 3. **Queue / Message Broker:** Redis acts as the broker holding pending video generation jobs. | |
| 4. **Celery Worker (The Core Engine):** | |
| - Picks up the task. | |
| - Downloads the raw video/audio from Supabase. | |
| - Runs **faster-whisper** to extract word-level timestamps. | |
| - Runs **VieNeu-TTS** to generate the voiceover from the script. | |
| - Generates a dynamic `.ass` file mapping timestamps to bouncing subtitles. | |
| - Uses **ffmpeg-python** to merge the new audio, burn in the `.ass` subtitles, and output the final video. | |
| - Uploads the resulting `.mp4` to Supabase. | |
| - Updates the Supabase DB row to `completed`. | |
| 5. **Client Notification:** The frontend polls FastAPI (or uses websockets) for the job status and plays the final video. | |
| ```mermaid | |
| sequenceDiagram | |
| participant User | |
| participant FastAPI | |
| participant Supabase | |
| participant Redis_Queue | |
| participant Celery_Worker | |
| User->>FastAPI: POST /generate (video, script, voice_ref) | |
| FastAPI->>Supabase: Save raw assets to Storage | |
| FastAPI->>Supabase: Create job record (status: 'pending') | |
| FastAPI->>Redis_Queue: Enqueue 'render_video' task | |
| FastAPI-->>User: Return Job ID (Async Response) | |
| Celery_Worker->>Redis_Queue: Dequeue task | |
| Celery_Worker->>Supabase: Download raw assets | |
| Celery_Worker->>Celery_Worker: 1. faster-whisper (Timestamps) | |
| Celery_Worker->>Celery_Worker: 2. VieNeu-TTS (Voiceover) | |
| Celery_Worker->>Celery_Worker: 3. Generate dynamic .ass subtitle | |
| Celery_Worker->>Celery_Worker: 4. ffmpeg-python (Burn & Merge) | |
| Celery_Worker->>Supabase: Upload final video.mp4 | |
| Celery_Worker->>Supabase: Update job status ('completed') | |
| User->>FastAPI: GET /status/{job_id} | |
| FastAPI->>Supabase: Check DB record | |
| FastAPI-->>User: returns final video URL | |
| ``` | |
| ## 2. FastAPI Setup & Supabase Integration | |
| We will define `.env` configurations and simple endpoints. | |
| ### `main.py` | |
| ```python | |
| import os | |
| from fastapi import FastAPI, UploadFile, File, Form | |
| from pydantic import BaseModel | |
| from supabase import create_client, Client | |
| from worker import render_video_task # Celery task | |
| app = FastAPI(title="Video AI processing API") | |
| # Setup Supabase | |
| SUPABASE_URL = os.getenv("SUPABASE_URL") | |
| SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY") # Use Service role for backend ops | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| class RenderJobRequest(BaseModel): | |
| script_text: str | |
| voice_preset_id: str = "default" | |
| @app.post("/api/v1/jobs/submit") | |
| async def submit_job( | |
| script: str = Form(...), | |
| video: UploadFile = File(...), | |
| ref_audio: UploadFile = File(None) | |
| ): | |
| """ | |
| Receives frontend files, stores them, and dispatches a Celery task. | |
| """ | |
| # 1. Upload assets to Supabase Storage | |
| video_bytes = await video.read() | |
| video_path = f"raw_videos/{video.filename}" | |
| supabase.storage.from_("content").upload(path=video_path, file=video_bytes) | |
| ref_audio_path = None | |
| if ref_audio: | |
| ref_audio_bytes = await ref_audio.read() | |
| ref_audio_path = f"references/{ref_audio.filename}" | |
| supabase.storage.from_("content").upload(path=ref_audio_path, file=ref_audio_bytes) | |
| # 2. Create DB record for the job track | |
| db_resp = supabase.table("video_jobs").insert({ | |
| "status": "pending", | |
| "script": script, | |
| "raw_video_path": video_path | |
| }).execute() | |
| job_id = db_resp.data[0]["id"] | |
| # 3. Dispatch to Celery queue | |
| render_video_task.delay(job_id, video_path, script, ref_audio_path) | |
| return {"job_id": job_id, "status": "processing_queued"} | |
| @app.get("/api/v1/jobs/{job_id}") | |
| async def get_job_status(job_id: str): | |
| response = supabase.table("video_jobs").select("*").eq("id", job_id).execute() | |
| if not response.data: | |
| return {"error": "Job not found"} | |
| return response.data[0] | |
| ``` | |
| ## 3. Task Management: Celery + Redis vs BackgroundTasks | |
| **Senior Advice:** Never use FastAPI's native `BackgroundTasks` for Heavy AI (Whisper/VieNeu) or Rendering computations. `BackgroundTasks` run in the exact same event loop / process pool as your web application. A 3-minute video render will hog the worker and completely lock up concurrent requests, causing the server to freeze and reject other users. | |
| **The Solution:** Use **Celery** with **Redis** as a broker. This lets your web container act purely as a lightweight router, while independent worker processes (potentially on different GPU servers) handle the heavy lifting. | |
| ### `worker.py` (Celery Configuration) | |
| ```python | |
| from celery import Celery | |
| import os | |
| import tempfile | |
| from services.ai_pipeline import process_video_pipeline # See core logic below | |
| # Initialize Celery pointing to Redis | |
| celery_app = Celery( | |
| "video_tasks", | |
| broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"), | |
| backend=os.getenv("REDIS_URL", "redis://localhost:6379/0") | |
| ) | |
| @celery_app.task(bind=True, max_retries=3) | |
| def render_video_task(self, job_id: str, video_path: str, script: str, ref_audio_path: str): | |
| from main import supabase # Import inside task to prevent circular deps | |
| try: | |
| # Update DB status | |
| supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute() | |
| # Download files to local temp storage | |
| with tempfile.TemporaryDirectory() as tmpdir: | |
| local_video = os.path.join(tmpdir, "input.mp4") | |
| with open(local_video, "wb") as f: | |
| f.write(supabase.storage.from_("content").download(video_path)) | |
| local_ref = None | |
| if ref_audio_path: | |
| local_ref = os.path.join(tmpdir, "ref.wav") | |
| with open(local_ref, "wb") as f: | |
| f.write(supabase.storage.from_("content").download(ref_audio_path)) | |
| # RUN CORE ML & FFMPEG LOGIC | |
| output_mp4 = process_video_pipeline(tmpdir, local_video, script, local_ref) | |
| # Upload Result | |
| result_path = f"rendered/{job_id}_final.mp4" | |
| with open(output_mp4, "rb") as f: | |
| supabase.storage.from_("content").upload(path=result_path, file=f.read()) | |
| # Finish | |
| supabase.table("video_jobs").update({ | |
| "status": "completed", | |
| "result_url": supabase.storage.from_("content").get_public_url(result_path) | |
| }).eq("id", job_id).execute() | |
| except Exception as e: | |
| supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute() | |
| raise e | |
| ``` | |
| *(To run this, you execute `celery -A worker celery_app worker --loglevel=info` in your terminal).* | |
| ## 4. AI & Video Processing Logic (The Core) | |
| This is the orchestration module handling Whisper, TTS, .ass generation, and FFmpeg. | |
| ### `services/ai_pipeline.py` | |
| ```python | |
| import os | |
| import ffmpeg | |
| from faster_whisper import WhisperModel | |
| from vieneu import Vieneu | |
| # Initialize Models at module level so Celery workers only load it once | |
| # NOTE: Set Vieneu mode appropriately (Remote if hitting LMDeploy server, else turbo/standard) | |
| tts = Vieneu() | |
| # device = "cuda" if torch.cuda.is_available() else "cpu" | |
| whisper_model = WhisperModel("base", device="cpu", compute_type="int8") | |
| def process_video_pipeline(tmpdir: str, video_file: str, script: str, ref_audio: str = None) -> str: | |
| """ | |
| Main orchestration function combining TTS, STT, and Video Rendering. | |
| """ | |
| tts_audio_path = os.path.join(tmpdir, "tts_voiceover.wav") | |
| # 1. GENERATE VIENEU-TTS VOICEOVER | |
| if ref_audio: | |
| # Zero-shot clone | |
| my_voice = tts.encode_reference(ref_audio) | |
| audio_array = tts.infer(text=script, voice=my_voice) | |
| else: | |
| # Default voice | |
| audio_array = tts.infer(text=script) | |
| tts.save(audio_array, tts_audio_path) | |
| # 2. FASTER-WHISPER TIMESTAMP EXTRACTION | |
| # (We run whisper on the generated pristine TTS audio, not the video audio, for perfect timestamps) | |
| segments, info = whisper_model.transcribe(tts_audio_path, word_timestamps=True, language="vi") | |
| words_data = [] | |
| for segment in segments: | |
| for word in segment.words: | |
| words_data.append({ | |
| "text": word.word.strip(), | |
| "start": word.start, | |
| "end": word.end | |
| }) | |
| # 3. GENERATE DYNAMIC .ASS SUBTITLE | |
| ass_path = os.path.join(tmpdir, "dynamic_subs.ass") | |
| generate_ass_file(words_data, ass_path) | |
| # 4. BURN IN WITH FFMPEG (Replace Audio & Burn Subs) | |
| output_video = os.path.join(tmpdir, "final_output.mp4") | |
| # Video channel (without original audio) | |
| in_video = ffmpeg.input(video_file).video | |
| # New Voiceover channel | |
| in_audio = ffmpeg.input(tts_audio_path) | |
| # Burn subtitle filter. | |
| # Note: escaping the path for Windows/Linux is tricky for libass, ffmpeg-python handles it internally most times. | |
| video_with_subs = in_video.filter('ass', ass_path) | |
| ( | |
| ffmpeg | |
| .output(video_with_subs, in_audio, output_video, vcodec="libx264", acodec="aac", audio_bitrate="192k") | |
| .overwrite_output() | |
| .run() | |
| ) | |
| return output_video | |
| def generate_ass_file(words_data: list, dest_path: str): | |
| """ | |
| Takes an array of {text, start, end} and crafts an advanced SubStation Alpha file | |
| with modern word-bounce (karaoke) color changes. | |
| """ | |
| header = """[Script Info] | |
| ScriptType: v4.00+ | |
| Collisions: Normal | |
| PlayResX: 1920 | |
| PlayResY: 1080 | |
| [V4+ Styles] | |
| Format: Name, Fontname, Fontsize, PrimaryColour, SecondaryColour, OutlineColour, BackColour, Bold, Italic, Underline, StrikeOut, ScaleX, ScaleY, Spacing, Angle, BorderStyle, Outline, Shadow, Alignment, MarginL, MarginR, MarginV, Encoding | |
| Style: Main,Arial,80,&H00FFFFFF,&H000000FF,&H00000000,&H80000000,-1,0,0,0,100,100,0,0,1,3,2,2,10,10,50,1 | |
| [Events] | |
| Format: Layer, Start, End, Style, Name, MarginL, MarginR, MarginV, Effect, Text | |
| """ | |
| def format_time(seconds: float) -> str: | |
| # Format: H:MM:SS.cs | |
| h = int(seconds // 3600) | |
| m = int((seconds % 3600) // 60) | |
| s = seconds % 60 | |
| return f"{h}:{m:02d}:{s:05.2f}" | |
| lines = [header] | |
| # Chunk words into 5-word phrases | |
| chunk_size = 5 | |
| for i in range(0, len(words_data), chunk_size): | |
| chunk = words_data[i:i+chunk_size] | |
| time_start = chunk[0]['start'] | |
| time_end = chunk[-1]['end'] | |
| # We will create ONE event line for this whole phrase length, duplicating the line | |
| # across the active word boundary to change its color cleanly (Tiktok style popping). | |
| for active_idx, target_word in enumerate(chunk): | |
| w_start = target_word['start'] | |
| w_end = target_word['end'] | |
| line_text = "" | |
| for j, w in enumerate(chunk): | |
| if j == active_idx: | |
| # Highlight active word with scaling and Yellow Color | |
| line_text += f"{{\\fscx120\\fscy120\\c&H00FDFF&}}{w['text']}{{\\fscx100\\fscy100\\c&HFFFFFF&}} " | |
| else: | |
| line_text += f"{w['text']} " | |
| line_str = f"Dialogue: 0,{format_time(w_start)},{format_time(w_end)},Main,,0,0,0,,{line_text.strip()}\n" | |
| lines.append(line_str) | |
| with open(dest_path, "w", encoding="utf-8") as f: | |
| f.writelines(lines) | |
| ``` | |
| ## Setup Next Steps: | |
| 1. Initialize the Python project and environments: `uv init` | |
| 2. Install dependencies: `uv add fastapi uvicorn supabase celery redis python-multipart faster-whisper ffmpeg-python vieneu pydantic` | |
| 3. Install external OS dependencies: `ffmpeg` and `redis-server`. Ensure your machine has CUDA drivers installed if testing GPU rendering. | |
| 4. Once reviewed, let me know if you would like me to bootstrap these actual files and configure the workspace! | |