File size: 10,121 Bytes
6e413b2
911c66e
 
 
6376ca1
 
6e413b2
 
911c66e
 
 
 
6376ca1
6e413b2
911c66e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e413b2
 
 
 
 
 
 
 
911c66e
 
 
 
 
6e413b2
 
 
 
 
 
 
 
 
 
 
 
 
911c66e
6e413b2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6376ca1
 
911c66e
6376ca1
 
 
 
 
 
 
911c66e
 
 
 
 
 
 
6376ca1
 
 
 
 
 
 
 
911c66e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6376ca1
911c66e
 
 
 
 
 
 
 
 
 
 
 
6376ca1
 
911c66e
 
 
 
6376ca1
911c66e
 
 
 
 
6376ca1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
911c66e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
import os
import re
import unicodedata
import soundfile as sf
from dotenv import load_dotenv
load_dotenv(override=True)
from celery import Celery
import tempfile
import sys
import redis
from contextlib import contextmanager
from services.ai_pipeline import process_video_pipeline, generate_tts_only, process_studio_pipeline
from supabase import create_client, Client

redis_client = redis.from_url(os.getenv("REDIS_URL", "redis://localhost:6379/0"))

class ProgressCatcher:
    def __init__(self, original_stream, job_id):
        self.original_stream = original_stream
        self.job_id = job_id

    def write(self, text):
        self.original_stream.write(text)
        match = re.search(r'\[(\d+:\d+)<(\d+:\d+)', text)
        if match:
            elapsed = match.group(1)
            remaining = match.group(2)
            redis_client.set(f"progress_{self.job_id}", f"{elapsed}|{remaining}")

    def flush(self):
        self.original_stream.flush()

@contextmanager
def catch_progress(job_id):
    original_stderr = sys.stderr
    sys.stderr = ProgressCatcher(original_stderr, job_id)
    try:
        yield
    finally:
        sys.stderr = original_stderr

def slugify(text):
    # Chuyển tiếng Việt có dấu thành không dấu
    text = unicodedata.normalize('NFD', text).encode('ascii', 'ignore').decode("utf-8")
    # Xóa ký tự đặc biệt, chuyển sang lowercase, thay khoảng trắng bằng gạch dưới
    text = re.sub(r'[^\w\s-]', '', text).strip().lower()
    text = re.sub(r'[-\s]+', '_', text)
    return text[:30]

# Initialize Celery pointing to Redis
celery_app = Celery(
    "video_tasks",
    broker=os.getenv("REDIS_URL", "redis://localhost:6379/0"),
    backend=os.getenv("REDIS_URL", "redis://localhost:6379/0")
)

@celery_app.task(bind=True, max_retries=3)
def render_video_task(self, job_id: str, script: str, ref_audio_path: str, aspect_ratio: str, sub_style: str, font_name: str, highlight_color: str):
    # Setup Supabase client per worker (avoid circular dependency with main.py)
    SUPABASE_URL = os.getenv("SUPABASE_URL")
    SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
    supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
    try:
        # Update DB status
        supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
        
        # Download files to local temp storage
        with tempfile.TemporaryDirectory() as tmpdir:
            local_ref = None
            if ref_audio_path:
                local_ref = os.path.join(tmpdir, "ref.wav")
                with open(local_ref, "wb") as f:
                    f.write(supabase.storage.from_("content").download(ref_audio_path))
            
            # RUN CORE ML & FFMPEG LOGIC
            output_mp4 = process_video_pipeline(tmpdir, script, local_ref, aspect_ratio, sub_style, font_name, highlight_color)
            
            # Upload Result
            result_path = f"rendered/{job_id}_final.mp4"
            with open(output_mp4, "rb") as f:
                supabase.storage.from_("content").upload(path=result_path, file=f.read())
                
            # Finish
            supabase.table("video_jobs").update({
                "status": "completed", 
                "result_url": supabase.storage.from_("content").get_public_url(result_path)
            }).eq("id", job_id).execute()

    except Exception as e:
        supabase.table("video_jobs").update({"status": "failed", "error": str(e)}).eq("id", job_id).execute()
        raise e

@celery_app.task
def generate_tts_task(job_id: str, script: str, voice: str, temperature: float, ref_audio_path: str = None, bgm_path: str = None, bgm_volume: float = 0.1):
    # Setup Supabase client per worker
    SUPABASE_URL = os.getenv("SUPABASE_URL")
    SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
    supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
    
    supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
    
    # --- AUTO-CLEAN SCRIPT ---
    # Xóa các đoạn ghi chú trong ngoặc vuông [như thế này] để không bị đọc nhầm (Bỏ qua [p:ms], [v:voice], [s:speed])
    script = re.sub(r'\[(?!(?:p|v|s):\d*\.?\d*\]).*?\]', '', script).strip()
    script = re.sub(r'\.{2,}', ',', script)
    script = re.sub(r'\s+', ' ', script).strip()
    # -------------------------

    try:
        with tempfile.TemporaryDirectory() as tmpdir:
            # 1. Download ref audio if it exists
            local_ref_path = None
            if ref_audio_path:
                local_ref_path = os.path.join(tmpdir, "input_ref.wav")
                with open(local_ref_path, 'wb') as f:
                    f.write(supabase.storage.from_("content").download(ref_audio_path))
            elif voice and (voice.endswith(".mp3") or voice.endswith(".wav")) and os.path.exists(voice):
                # Use local static voice file as reference (Copy to tmpdir to avoid overwriting original)
                import shutil
                local_ref_path = os.path.join(tmpdir, "static_ref.wav")
                shutil.copy(voice, local_ref_path)
            
            if local_ref_path:
                # --- AUTO-TRIM LOGIC ---
                # Đọc audio và cắt lấy 15 giây đầu để tránh tràn RAM (OOM) trên OnnxRuntime
                try:
                    data, samplerate = sf.read(local_ref_path)
                    # Nếu là stereo (2 kênh), lấy trung bình hoặc chỉ lấy 1 kênh
                    if len(data.shape) > 1:
                        data = data[:, 0]
                    
                    max_samples = 15 * samplerate
                    if len(data) > max_samples:
                        print(f"DEBUG: Audio mẫu quá dài ({len(data)/samplerate:.2f}s), tự động cắt còn 15s.")
                        data = data[:max_samples]
                        sf.write(local_ref_path, data, samplerate)
                except Exception as trim_err:
                    print(f"Warning: Không thể cắt audio mẫu: {trim_err}")
                # -----------------------
            
            # 2. Download BGM if provided
            local_bgm_path = None
            if bgm_path:
                if os.path.exists(bgm_path):
                    local_bgm_path = bgm_path
                else:
                    local_bgm_path = os.path.join(tmpdir, "bgm.mp3")
                    with open(local_bgm_path, 'wb') as f:
                        f.write(supabase.storage.from_("content").download(bgm_path))

            # 3. Run Pure TTS Engine
            result_audio_local = generate_tts_only(tmpdir, script, local_ref_path, temperature, local_bgm_path, bgm_volume)
            
            # 3. Upload Result Audio
            # Tạo tên file thân thiện từ 30 ký tự đầu của script
            friendly_name = slugify(script)
            final_audio_path = f"results/{friendly_name}_{job_id[:8]}.wav"
            
            with open(result_audio_local, 'rb') as f:
                supabase.storage.from_("content").upload(
                    path=final_audio_path, 
                    file=f,
                    file_options={"content-type": "audio/wav"}
                )
            
            public_url = supabase.storage.from_("content").get_public_url(final_audio_path)
            
            # 4. Mark job as complete
            supabase.table("video_jobs").update({
                "status": "completed",
                "result_url": public_url
            }).eq("id", job_id).execute()
            
    except Exception as e:
        import traceback
        traceback.print_exc()
        
        supabase.table("video_jobs").update({
            "status": "error",
            "error": str(e)
        }).eq("id", job_id).execute()
        raise e

@celery_app.task
def render_studio_task(job_id: str, script: str, temperature: float = 0.5, voice_preset: str = "default", bgm_path: str = None, bgm_volume: float = 0.1):
    """
    Background job for Studio to render full MP4 with expression tracking.
    """
    # Setup Supabase client per worker
    SUPABASE_URL = os.getenv("SUPABASE_URL")
    SUPABASE_KEY = os.getenv("SUPABASE_SERVICE_ROLE_KEY")
    supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)

    try:
        supabase.table("video_jobs").update({"status": "processing"}).eq("id", job_id).execute()
        
        with tempfile.TemporaryDirectory() as tmpdir:
            with catch_progress(job_id):
                # Download BGM if provided
                local_bgm_path = None
                if bgm_path:
                    if os.path.exists(bgm_path):
                        local_bgm_path = bgm_path
                    else:
                        local_bgm_path = os.path.join(tmpdir, "bgm.mp3")
                        with open(local_bgm_path, 'wb') as f:
                            f.write(supabase.storage.from_("content").download(bgm_path))

                output_mp4 = process_studio_pipeline(tmpdir, script, temperature, voice_preset, local_bgm_path, bgm_volume)
            
            # Upload Result Video
            friendly_name = slugify(script)
            final_video_path = f"results/studio_{friendly_name}_{job_id[:8]}.mp4"
            
            with open(output_mp4, 'rb') as f:
                supabase.storage.from_("content").upload(
                    path=final_video_path, 
                    file=f,
                    file_options={"content-type": "video/mp4"}
                )
            
            public_url = supabase.storage.from_("content").get_public_url(final_video_path)
            
            supabase.table("video_jobs").update({
                "status": "completed",
                "result_url": public_url
            }).eq("id", job_id).execute()
            
    except Exception as e:
        import traceback
        traceback.print_exc()
        supabase.table("video_jobs").update({
            "status": "error",
            "error": str(e)
        }).eq("id", job_id).execute()
        raise e