import gradio as gr import edge_tts import os import random import json from pydub import AudioSegment from pydub.effects import normalize, compress_dynamic_range, low_pass_filter, high_pass_filter import asyncio from datetime import datetime import zipfile import natsort import time import webvtt import re from typing import Dict, List, Tuple, Optional from datetime import timedelta import numpy as np import wave import time # Khởi tạo môi trường - Ưu tiên GPU class TTSModel: def __init__(self): self.models = {} self.tokenizer = Tokenizer() self.voice_cache = {} self.voice_files = self._discover_voices() try: if self.use_cuda: self.models['cuda'] = torch.compile(KModel().to('cuda').eval(), mode='max-autotune') with torch.no_grad(): _ = self.models['cuda'](torch.randn(1, 64).cuda(), torch.randn(1, 80, 100).cuda(), 1.0) self.models['cpu'] = KModel().to('cpu').eval() except Exception as e: print(f"Error loading model: {e}") self.models = {'cpu': KModel().to('cpu').eval()} self.pipelines = { 'a': KPipeline(lang_code='a', model=False), 'b': KPipeline(lang_code='b', model=False) } def _discover_voices(self): """Discover available voice files in the voices folder""" voice_files = {} voices_dir = "voices" if not os.path.exists(voices_dir): os.makedirs(voices_dir) print(f"Created voices directory at {os.path.abspath(voices_dir)}") return voice_files for file in os.listdir(voices_dir): if file.endswith(".pt"): voice_name = os.path.splitext(file)[0] voice_files[voice_name] = os.path.join(voices_dir, file) print(f"Found voice: {voice_name}") return voice_files def get_voice_list(self): """Get list of available voices for the UI""" voices = list(self.voice_files.keys()) if not voices: print("Warning: No voice files found in voices folder") return voices class TextProcessor: @staticmethod def clean_text(text: str) -> str: text = TextProcessor._process_special_cases(text) re_tab = re.compile(r'[\r\t]') re_spaces = re.compile(r' +') re_punctuation = re.compile(r'(\s)([,.!?])') text = re_tab.sub(' ', text) text = re_spaces.sub(' ', text) text = re_punctuation.sub(r'\2', text) return text.strip() @staticmethod def _process_special_cases(text: str) -> str: """Pipeline xử lý đặc biệt với thứ tự tối ưu""" text = TextProcessor._process_emails(text) text = TextProcessor._process_websites(text) text = TextProcessor._process_phone_numbers(text) text = TextProcessor._process_temperatures(text) text = TextProcessor._process_measurements(text) text = TextProcessor._process_currency(text) text = TextProcessor._process_percentages(text) text = TextProcessor._process_math_operations(text) text = TextProcessor._process_times(text) text = TextProcessor._process_years(text) text = TextProcessor._process_special_symbols(text) return text @staticmethod def _process_emails(text: str) -> str: """Process emails with correct English pronunciation for all special characters""" def convert_email(match): full_email = match.group(0) # Replace each special character with its English pronunciation processed = (full_email .replace('@', ' at ') .replace('.', ' dot ') .replace('-', ' dash ') .replace('_', ' underscore ') .replace('+', ' plus ') .replace('/', ' slash ') .replace('=', ' equals ')) return processed # Regex to match all email formats email_pattern = r'\b[\w.+-]+@[\w.-]+\.[a-zA-Z]{2,}\b' return re.sub(email_pattern, convert_email, text) @staticmethod def _process_websites(text: str) -> str: """Process websites with correct English pronunciation for special characters""" def convert_website(match): url = match.group(1) # Replace each special character with its English pronunciation return (url.replace('.', ' dot ') .replace('-', ' dash ') .replace('_', ' underscore ') .replace('/', ' slash ') .replace('?', ' question mark ') .replace('=', ' equals ') .replace('&', ' ampersand ')) # Only process websites that don't contain @ (to avoid conflict with emails) website_pattern = r'\b(?![\w.-]*@)((?:https?://)?(?:www\.)?[\w.-]+\.[a-z]{2,}(?:[/?=][\w.-]*)*)\b' return re.sub(website_pattern, convert_website, text, flags=re.IGNORECASE) @staticmethod def _process_temperatures(text: str) -> str: """Process temperatures and cardinal directions with degree symbols""" def temp_to_words(temp, unit): temp_text = TextProcessor._number_to_words(temp) unit = unit.upper() if unit else '' unit_map = { 'C': 'degrees Celsius', 'F': 'degrees Fahrenheit', 'N': 'degrees north', 'S': 'degrees south', 'E': 'degrees east', 'W': 'degrees west', '': 'degrees' # Default case for just number with degree symbol } unit_text = unit_map.get(unit, f'degrees {unit}') return f"{temp_text} {unit_text}" # Process formats like 75°F, 100°C, 15°N, 120°E text = re.sub( r'(-?\d+)°([NSEWCFnsewcf]?)', lambda m: temp_to_words(m.group(1), m.group(2)), text, flags=re.IGNORECASE ) # Add degree symbol pronunciation when standalone text = re.sub(r'°', ' degrees ', text) return text @staticmethod def _process_measurements(text: str) -> str: """Xử lý đơn vị đo lường, đọc chuẩn số thập phân (1.65m → 'one point six five meters')""" units_map = { 'km/h': 'kilometers per hour', 'mph': 'miles per hour', 'kg': 'kilograms', 'g': 'grams', 'cm': 'centimeters', 'm': 'meter', # Sửa thành singular để xử lý số nhiều sau 'mm': 'millimeters', 'L': 'liter', 'l': 'liter', 'ml': 'milliliter', 'mL': 'milliliter', 'h': 'hour', 'min': 'minute', 's': 'second' } plural_units = {'L', 'l', 'mL', 'ml'} # Đơn vị không thêm 's' dù số nhiều def measurement_to_words(value, unit): try: unit_lower = unit.lower() unit_text = units_map.get(unit, units_map.get(unit_lower, unit)) # Đọc số thập phân: one point six five if '.' in value: integer, decimal = value.split('.') value_text = ( f"{TextProcessor._number_to_words(integer)} " f"point {' '.join(TextProcessor._digit_to_word(d) for d in decimal)}" ) else: value_text = TextProcessor._number_to_words(value) # Xử lý số nhiều (thêm 's' nếu value != 1 và đơn vị không nằm trong plural_units) if float(value) != 1 and unit in units_map and unit not in plural_units: unit_text += 's' return f"{value_text} {unit_text}" except: return f"{value}{unit}" # Giữ nguyên nếu có lỗi # Regex bắt các số + đơn vị (kể cả viết liền như 1.65m) text = re.sub( r'(-?\d+\.?\d*)\s*({})s?\b'.format('|'.join(re.escape(key) for key in units_map.keys())), lambda m: measurement_to_words(m.group(1), m.group(2)), text, flags=re.IGNORECASE ) return text @staticmethod def _process_currency(text: str) -> str: """Xử lý tiền tệ (hỗ trợ số nguyên, thập phân, và dấu chấm cuối câu)""" currency_map = { '$': 'dollars', '€': 'euros', '£': 'pounds', '¥': 'yen', '₩': 'won', '₽': 'rubles' } def currency_to_words(value, symbol): # Xử lý dấu chấm kết thúc câu (ví dụ: $20.) if value.endswith('.'): value = value[:-1] return f"{TextProcessor._number_to_words(value)} {currency_map.get(symbol, '')}." # Xử lý số thập phân (ví dụ: $20.5 → "twenty dollars and fifty cents") if '.' in value: integer_part, decimal_part = value.split('.') decimal_part = decimal_part.ljust(2, '0') # Đảm bảo 2 chữ số return ( f"{TextProcessor._number_to_words(integer_part)} {currency_map.get(symbol, '')} " f"and {TextProcessor._number_to_words(decimal_part)} cents" ) # Số nguyên (ví dụ: $20 → "twenty dollars") return f"{TextProcessor._number_to_words(value)} {currency_map.get(symbol, '')}" # Regex bắt tiền tệ (số nguyên hoặc thập phân, không bắt dấu chấm cuối nếu không có số) text = re.sub( r'([$€£¥₩₽])(\d+(?:\.\d+)?)(?=\s|$|\.|,|;)', # Chỉ khớp nếu sau số là ký tự kết thúc lambda m: currency_to_words(m.group(2), m.group(1)), text ) return text @staticmethod def _process_percentages(text: str) -> str: """Xử lý phần trăm""" text = re.sub( r'(\d+\.?\d*)%', lambda m: f"{TextProcessor._number_to_words(m.group(1))} percent", text ) return text @staticmethod def _process_math_operations(text: str) -> str: """Xử lý các phép toán và khoảng số""" math_map = { '+': 'plus', '-': 'minus', # Mặc định là "minus", sẽ xử lý riêng cho khoảng số '×': 'times', '*': 'times', '÷': 'divided by', '/': 'divided by', '=': 'equals', '>': 'is greater than', '<': 'is less than' } # Xử lý KHOẢNG SỐ (3-4 → "three to four") khi KHÔNG có dấu = hoặc phép toán sau - text = re.sub( r'(\d+)\s*-\s*(\d+)(?!\s*[=+×*÷/><])', # Chỉ áp dụng khi KHÔNG có dấu =/+/*... sau - lambda m: f"{TextProcessor._number_to_words(m.group(1))} to {TextProcessor._number_to_words(m.group(2))}", text ) # Xử lý PHÉP TRỪ (chỉ khi có dấu = hoặc phép toán sau -) text = re.sub( r'(\d+)\s*-\s*(\d+)(?=\s*[=+×*÷/><])', # Chỉ áp dụng khi CÓ dấu =/+/*... sau - lambda m: f"{TextProcessor._number_to_words(m.group(1))} minus {TextProcessor._number_to_words(m.group(2))}", text ) # Xử lý các PHÉP TOÁN KHÁC (+, *, /, ...) text = re.sub( r'(\d+)\s*([+×*÷/=><])\s*(\d+)', lambda m: (f"{TextProcessor._number_to_words(m.group(1))} " f"{math_map.get(m.group(2), m.group(2))} " f"{TextProcessor._number_to_words(m.group(3))}"), text ) # Xử lý phân số 4/5 text = re.sub( r'(\d+)/(\d+)', lambda m: (f"{TextProcessor._number_to_words(m.group(1))} " f"divided by {TextProcessor._number_to_words(m.group(2))}"), text ) return text @staticmethod def _process_special_symbols(text: str) -> str: """Xử lý các ký hiệu đặc biệt""" symbol_map = { '@': 'at', '#': 'number', '&': 'and', '_': 'underscore' } # Xử lý @home → at home text = re.sub( r'@(\w+)', lambda m: f"at {m.group(1)}", text ) # Xử lý #1 → number one text = re.sub( r'#(\d+)', lambda m: f"number {TextProcessor._number_to_words(m.group(1))}", text ) # Xử lý các ký hiệu đơn lẻ for symbol, replacement in symbol_map.items(): text = text.replace(symbol, f' {replacement} ') return text @staticmethod def _process_times(text: str) -> str: """Xử lý MỌI định dạng thời gian (giờ:phút:giây, có/không AM/PM)""" text = re.sub( r'\b(\d{1,2}):(\d{2})(?::(\d{2}))?\s*(AM|PM|am|pm)?\b', lambda m: TextProcessor._time_to_words(m.group(1), m.group(2), m.group(3), m.group(4)), text ) return text @staticmethod def _time_to_words(hour: str, minute: str, second: str = None, period: str = None) -> str: """Chuyển thời gian thành giọng nói tự nhiên (bao gồm giây nếu có)""" hour_int = int(hour) minute_int = int(minute) # 1. Xử lý AM/PM (viết hoa chuẩn) period_text = f" {period.upper()}" if period else "" # 2. Chuyển đổi giờ 24h → 12h hour_12 = hour_int % 12 hour_text = "twelve" if hour_12 == 0 else TextProcessor._number_to_words(str(hour_12)) # 3. Xử lý phút minute_text = " \u200Bo'clock\u200B " if minute_int == 0 else \ f"oh {TextProcessor._number_to_words(minute)}" if minute_int < 10 else \ TextProcessor._number_to_words(minute) # 4. Xử lý giây (nếu có) second_text = "" if second and int(second) > 0: second_text = f" and {TextProcessor._number_to_words(second)} seconds" # 5. Ghép câu logic if minute_int == 0 and not second_text: return f"{hour_text}{minute_text}{period_text}" # 3:00 → "three o'clock" else: return f"{hour_text} {minute_text}{second_text}{period_text}" # 3:05:30 → "three oh five and thirty seconds" @staticmethod def _process_years(text: str) -> str: """Xử lý các năm trong văn bản""" # Xử lý năm 4 chữ số từ 1000-2999 (phổ biến nhất) text = re.sub( r'\b(1[0-9]{3}|2[0-9]{3})\b', lambda m: TextProcessor._year_to_words(m.group(1)), text ) # Xử lý năm 2 chữ số (nếu cần) text = re.sub( r'\b([0-9]{2})\b', lambda m: TextProcessor._two_digit_year_to_words(m.group(1)), text ) return text @staticmethod def _year_to_words(year: str) -> str: """Chuyển năm 4 chữ số thành chữ""" if len(year) != 4: return year # Năm từ 2000-2099 có thể đọc là "two thousand twenty-one" hoặc "twenty twenty-one" if year.startswith('20'): # Lựa chọn cách đọc phổ biến hơn return f"twenty {TextProcessor._two_digit_year_to_words(year[2:])}" # Các năm khác đọc bình thường return TextProcessor._number_to_words(year) @staticmethod def _two_digit_year_to_words(num: str) -> str: """Chuyển số 2 chữ số thành chữ (cho năm)""" if len(num) != 2: return num num_int = int(num) if num_int == 0: return "zero zero" if num_int < 10: return f"oh {TextProcessor._digit_to_word(num[1])}" ones = ['', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen', 'seventeen', 'eighteen', 'nineteen'] tens = ['', '', 'twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety'] if num_int < 20: return ones[num_int] ten, one = divmod(num_int, 10) if one == 0: return tens[ten] return f"{tens[ten]} {ones[one]}" @staticmethod def _process_phone_numbers(text: str) -> str: """Xử lý số điện thoại với regex chính xác hơn""" # Pattern mới tránh xung đột với số La Mã phone_pattern = r'\b(\d{3})[-. ]?(\d{3})[-. ]?(\d{4})\b' def phone_to_words(match): groups = match.groups() # Đọc từng số trong từng nhóm và thêm dấu phẩy (,) để tạo ngắt nghỉ parts = [] for part in groups: digits = ' '.join([TextProcessor._digit_to_word(d) for d in part]) parts.append(digits) return ', '.join(parts) # Thêm dấu phẩy để tạo ngắt nghỉ khi đọc return re.sub(phone_pattern, phone_to_words, text) @staticmethod def _process_currency_numbers(text: str) -> str: return re.sub( r'\$?(\d{1,3}(?:,\d{3})*(?:\.\d+)?)\b', lambda m: f"{TextProcessor._number_to_words(m.group(1))} dollars" if '$' in m.group(0) else TextProcessor._number_to_words(m.group(1)), text ) @staticmethod def _digit_to_word(digit: str) -> str: digit_map = { '0': 'zero', '1': 'one', '2': 'two', '3': 'three', '4': 'four', '5': 'five', '6': 'six', '7': 'seven', '8': 'eight', '9': 'nine' } return digit_map.get(digit, digit) @staticmethod def _number_to_words(number: str) -> str: num_str = number.replace(',', '') try: if '.' in num_str: integer_part, decimal_part = num_str.split('.') integer_text = TextProcessor._int_to_words(integer_part) decimal_text = ' '.join([TextProcessor._digit_to_word(d) for d in decimal_part]) return f"{integer_text} point {decimal_text}" return TextProcessor._int_to_words(num_str) except: return number @staticmethod def _digits_to_words(digits: str) -> str: return ' '.join([TextProcessor._digit_to_word(d) for d in digits]) @staticmethod def _int_to_words(num_str: str) -> str: num = int(num_str) if num == 0: return 'zero' units = ['', 'thousand', 'million', 'billion', 'trillion'] words = [] level = 0 while num > 0: chunk = num % 1000 if chunk != 0: words.append(TextProcessor._convert_less_than_thousand(chunk) + ' ' + units[level]) num = num // 1000 level += 1 return ' '.join(reversed(words)).strip() @staticmethod def _convert_less_than_thousand(num: int) -> str: ones = ['', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine', 'ten', 'eleven', 'twelve', 'thirteen', 'fourteen', 'fifteen', 'sixteen', 'seventeen', 'eighteen', 'nineteen'] tens = ['', '', 'twenty', 'thirty', 'forty', 'fifty', 'sixty', 'seventy', 'eighty', 'ninety'] if num == 0: return '' if num < 20: return ones[num] if num < 100: return tens[num // 10] + (' ' + ones[num % 10] if num % 10 != 0 else '') return ones[num // 100] + ' hundred' + (' ' + TextProcessor._convert_less_than_thousand(num % 100) if num % 100 != 0 else '') @staticmethod def split_sentences(text: str) -> List[str]: re_special_cases = re.compile(r'(? List[Tuple[str, str]]: """Phân tích nội dung hội thoại với các prefix chỉ định""" dialogues = [] current = None for line in text.split('\n'): line = line.strip() if not line: continue # Kiểm tra xem dòng có bắt đầu bằng bất kỳ prefix nào không found_prefix = None for prefix in prefixes: if line.lower().startswith(prefix.lower() + ':'): found_prefix = prefix break if found_prefix: if current: # Xử lý các trường hợp đặc biệt trước khi thêm vào dialogues processed_content = TextProcessor._process_special_cases(current[1]) dialogues.append((current[0], processed_content)) speaker = found_prefix content = line[len(found_prefix)+1:].strip() current = (speaker, content) elif current: current = (current[0], current[1] + ' ' + line) if current: # Xử lý các trường hợp đặc biệt cho dòng cuối cùng processed_content = TextProcessor._process_special_cases(current[1]) dialogues.append((current[0], processed_content)) return dialogues class AudioProcessor: @staticmethod def enhance_audio(audio: np.ndarray, volume: float = 1.0, pitch: float = 1.0) -> np.ndarray: # 1. Chuẩn hóa và bảo vệ chống clipping max_sample = np.max(np.abs(audio)) + 1e-8 audio = (audio / max_sample) * 0.9 * volume # Giữ headroom 10% # 2. Soft clipping để tránh distortion audio = np.tanh(audio * 1.5) / 1.5 # Hàm tanh cho soft clipping mượt # 3. Chuyển sang AudioSegment với xử lý pitch audio_seg = AudioSegment( (audio * 32767).astype(np.int16).tobytes(), frame_rate=24000, sample_width=2, channels=1 ) # 4. Xử lý pitch với crossfade if pitch != 1.0: audio_seg = audio_seg._spawn( audio_seg.raw_data, overrides={"frame_rate": int(audio_seg.frame_rate * pitch)} ).set_frame_rate(24000).fade_in(10).fade_out(10) # 5. Xử lý động và lọc tần audio_seg = compress_dynamic_range( audio_seg, threshold=-12.0, ratio=3.5, attack=5, release=50 ) audio_seg = audio_seg.low_pass_filter(11000).high_pass_filter(200) # 6. Chuẩn hóa an toàn if audio_seg.max_dBFS > -1.0: audio_seg = audio_seg.apply_gain(-audio_seg.max_dBFS * 0.8) return np.array(audio_seg.get_array_of_samples()) / 32768.0 @staticmethod def calculate_pause(text: str, pause_settings: Dict[str, int]) -> int: """Calculate pause duration with more precise rules""" text = text.strip() if not text: return 0 # Special cases that should have no pause if re.search(r'(?:^|\s)(?:Mr|Mrs|Ms|Dr|Prof|St|A\.M|P\.M|etc|e\.g|i\.e)\.$', text, re.IGNORECASE): return 0 # Time formats (12:30) - minimal pause if re.search(r'\b\d{1,2}:\d{2}\b', text): return pause_settings.get('time_colon_pause', 50) # Default 50ms for times # Determine pause based on last character last_char = text[-1] return pause_settings.get(last_char, pause_settings['default_pause']) @staticmethod def combine_segments(segments: List[AudioSegment], pauses: List[int]) -> AudioSegment: """Combine audio segments with frame-accurate timing""" combined = AudioSegment.silent(duration=0) # Start with 0 silence for i, (seg, pause) in enumerate(zip(segments, pauses)): # Apply fades without affecting duration seg = seg.fade_in(10).fade_out(10) # Add segment combined += seg # Add pause if not the last segment if i < len(segments) - 1: combined += AudioSegment.silent(duration=max(50, pause)) return combined @staticmethod def combine_with_pauses(segments: List[AudioSegment], pauses: List[int]) -> AudioSegment: combined = AudioSegment.empty() for i, (seg, pause) in enumerate(zip(segments, pauses)): seg = seg.fade_in(50).fade_out(50) combined += seg if i < len(segments) - 1: combined += AudioSegment.silent(duration=pause) return combined # ==================== SYSTEM CONFIGURATION ==================== class TTSConfig: SETTINGS_FILE = "edge_tts_settings.json" LANGUAGES = { "Tiếng Việt": [ {"name": "vi-VN-HoaiMyNeural", "gender": "Nữ"}, {"name": "vi-VN-NamMinhNeural", "gender": "Nam"} ], "English (US)": [ {"name": "en-US-GuyNeural", "gender": "Nam"}, {"name": "en-US-JennyNeural", "gender": "Nữ"}, {"name": "en-US-AvaNeural", "gender": "Nữ"}, {"name": "en-US-AndrewNeural", "gender": "Nam"}, {"name": "en-US-EmmaNeural", "gender": "Nữ"}, {"name": "en-US-BrianNeural", "gender": "Nam"}, {"name": "en-US-AnaNeural", "gender": "Nữ"}, {"name": "en-US-AndrewMultilingualNeural", "gender": "Nam"}, {"name": "en-US-AriaNeural", "gender": "Nữ"}, {"name": "en-US-AvaMultilingualNeural", "gender": "Nữ"}, {"name": "en-US-BrianMultilingualNeural", "gender": "Nam"}, {"name": "en-US-ChristopherNeural", "gender": "Nam"}, {"name": "en-US-EmmaMultilingualNeural", "gender": "Nữ"}, {"name": "en-US-EricNeural", "gender": "Nam"}, {"name": "en-US-MichelleNeural", "gender": "Nữ"}, {"name": "en-US-RogerNeural", "gender": "Nam"}, {"name": "en-US-SteffanNeural", "gender": "Nam"} ], "English (UK)": [ {"name": "en-GB-LibbyNeural", "gender": "Nữ"}, {"name": "en-GB-MiaNeural", "gender": "Nữ"}, {"name": "en-GB-RyanNeural", "gender": "Nam"}, {"name": "en-GB-MaisieNeural", "gender": "Nữ"}, {"name": "en-GB-SoniaNeural", "gender": "Nữ"}, {"name": "en-GB-ThomasNeural", "gender": "Nam"} ] } # ==================== AUDIO PROCESSOR ==================== class AudioProcessor: @staticmethod def calculate_pause(text: str, pause_settings: Dict[str, int]) -> int: """Calculate pause duration with more precise rules""" text = text.strip() if not text: return 0 # Special cases that should have no pause if re.search(r'(?:^|\s)(?:Mr|Mrs|Ms|Dr|Prof|St|A\.M|P\.M|etc|e\.g|i\.e)\.$', text, re.IGNORECASE): return 0 # Time formats (12:30) - minimal pause if re.search(r'\b\d{1,2}:\d{2}\b', text): return pause_settings.get('time_colon_pause', 50) # Default 50ms for times # Determine pause based on last character last_char = text[-1] return pause_settings.get(last_char, pause_settings['default_pause']) @staticmethod def combine_with_pauses(segments: List[AudioSegment], pauses: List[int]) -> AudioSegment: combined = AudioSegment.empty() for i, (seg, pause) in enumerate(zip(segments, pauses)): seg = seg.fade_in(50).fade_out(50) combined += seg if i < len(segments) - 1: combined += AudioSegment.silent(duration=pause) return combined # ==================== SUBTITLE GENERATOR ==================== class SubtitleGenerator: @staticmethod def clean_subtitle_text(text: str) -> str: """Remove Q:/A:/CHARx: prefixes from subtitle text""" cleaned = re.sub(r'^(Q|A|CHAR\d+):\s*', '', text.strip()) return cleaned @staticmethod def split_long_sentences(text: str, max_length: int = 120) -> List[str]: """Split long sentences at punctuation marks while preserving meaning""" sentences = [] current = "" # Split at punctuation first parts = re.split(r'([.!?])', text) # Recombine with punctuation but check length for i in range(0, len(parts)-1, 2): part = parts[i] + (parts[i+1] if i+1 < len(parts) else "") if len(current + part) <= max_length: current += part else: if current: sentences.append(current) current = part if current: sentences.append(current) return sentences @staticmethod def generate_srt(audio_segments: List[AudioSegment], sentences: List[str], pause_settings: Dict[str, int]) -> str: """Generate SRT format subtitles with precise timing information""" subtitles = [] current_time = 150 # Start with initial silence (150ms) max_subtitle_length = 120 # Maximum characters per subtitle line for i, (seg, sentence) in enumerate(zip(audio_segments, sentences)): # Remove Q: and A: prefixes if present cleaned_sentence = re.sub(r'^(Q|A|CHAR\d+):\s*', '', sentence.strip()) # Split long sentences into smaller chunks at punctuation sentence_chunks = SubtitleGenerator.split_long_sentences(cleaned_sentence, max_subtitle_length) # Calculate duration per chunk (equal division for simplicity) chunk_duration = len(seg) / max(1, len(sentence_chunks)) for j, chunk in enumerate(sentence_chunks): start_time = current_time + (j * chunk_duration) end_time = start_time + chunk_duration # Add subtitle entry subtitles.append({ 'start': int(start_time), 'end': int(end_time), 'text': chunk.strip() }) # Update current time with segment duration current_time += len(seg) # Add pause if not the last segment if i < len(audio_segments) - 1: pause = AudioProcessor.calculate_pause(sentence, pause_settings) current_time += max(100, pause) # Convert to SRT format with precise timing srt_content = [] for idx, sub in enumerate(subtitles, 1): start_time = timedelta(milliseconds=sub['start']) end_time = timedelta(milliseconds=sub['end']) # Format: 00:00:01,040 --> 00:00:09,760 start_str = f"{start_time.total_seconds() // 3600:02.0f}:{(start_time.total_seconds() % 3600) // 60:02.0f}:{start_time.total_seconds() % 60:06.3f}".replace('.', ',') end_str = f"{end_time.total_seconds() // 3600:02.0f}:{(end_time.total_seconds() % 3600) // 60:02.0f}:{end_time.total_seconds() % 60:06.3f}".replace('.', ',') srt_content.append( f"{idx}\n" f"{start_str} --> {end_str}\n" f"{sub['text']}\n" ) return "\n".join(srt_content) # ==================== BASE PROCESSOR CLASS ==================== class BaseTTSProcessor: def __init__(self): self.voice_map = {} self.initialize_voices() self.load_settings() self.audio_processor = AudioProcessor() self.subtitle_generator = SubtitleGenerator() def initialize_voices(self): for lang, voices in TTSConfig.LANGUAGES.items(): for voice in voices: voice_name = voice['name'].split('-')[-1].replace('Neural', '') display_name = f"{lang} - {voice_name} ({voice['gender']})" self.voice_map[display_name] = voice['name'] def load_settings(self): if os.path.exists(TTSConfig.SETTINGS_FILE): with open(TTSConfig.SETTINGS_FILE, 'r') as f: self.settings = json.load(f) else: self.settings = {} def save_settings(self): with open(TTSConfig.SETTINGS_FILE, 'w') as f: json.dump(self.settings, f) async def generate_speech(self, text, voice_id, rate, pitch, volume): try: # Add random delay between requests to prevent server overload await asyncio.sleep(random.uniform(0.1, 0.5)) rate_str = f"{rate}%" if rate != 0 else "+0%" pitch_str = f"+{pitch}Hz" if pitch >=0 else f"{pitch}Hz" communicate = edge_tts.Communicate(text, voice_id, rate=rate_str, pitch=pitch_str) temp_file = f"temp_{random.randint(1000,9999)}.mp3" # Generate audio and subtitles subs = [] start_time = 0 async for chunk in communicate.stream(): if chunk["type"] == "audio": with open(temp_file, "ab") as audio_file: audio_file.write(chunk["data"]) elif chunk["type"] == "WordBoundary": subs.append({ "text": chunk["text"], "start": chunk["offset"], "end": chunk["offset"] + chunk["duration"] }) start_time = end_time # Audio processing pipeline audio = AudioSegment.from_file(temp_file) # Apply volume adjustment (limit to +10dB max) volume_adjustment = min(max(volume - 100, -50), 10) # Limit to +10dB max audio = audio + volume_adjustment # Apply audio processing effects audio = normalize(audio) audio = compress_dynamic_range(audio, threshold=-20.0, ratio=4.0) audio = low_pass_filter(audio, 14000) # Reduce high-frequency hiss audio = high_pass_filter(audio, 100) # Remove ultra-low frequencies # Export with higher bitrate audio.export(temp_file, format="mp3", bitrate="256k") return temp_file, subs except Exception as e: print(f"Error generating speech: {str(e)}") return None, [] def generate_srt(self, subtitles, output_path): """Generate SRT file from subtitles data""" if not subtitles: return None srt_path = output_path.replace('.mp3', '.srt') try: with open(srt_path, 'w', encoding='utf-8') as f: for i, sub in enumerate(subtitles, start=1): start = timedelta(milliseconds=sub["start"]) end = timedelta(milliseconds=sub["end"]) # Format: 00:00:01,040 --> 00:00:09,760 start_str = f"{start.total_seconds() // 3600:02.0f}:{(start.total_seconds() % 3600) // 60:02.0f}:{start.total_seconds() % 60:06.3f}".replace('.', ',') end_str = f"{end.total_seconds() // 3600:02.0f}:{(end.total_seconds() % 3600) // 60:02.0f}:{end.total_seconds() % 60:06.3f}".replace('.', ',') f.write(f"{i}\n{start_str} --> {end_str}\n{sub['text']}\n\n") return srt_path except Exception as e: print(f"Error generating SRT: {e}") return None def _format_time(self, milliseconds): """Convert milliseconds to SRT time format""" seconds, milliseconds = divmod(milliseconds, 1000) minutes, seconds = divmod(seconds, 60) hours, minutes = divmod(minutes, 60) return f"{hours:02d}:{minutes:02d}:{seconds:02d},{milliseconds:03d}" def check_srt_generated(self, audio_path): if not audio_path: return False srt_path = audio_path.replace('.mp3', '.srt') return os.path.exists(srt_path) # ==================== TAB 1: SINGLE CHARACTER ==================== class StoryTTSProcessor(BaseTTSProcessor): def __init__(self): super().__init__() if not self.settings.get("single_char"): self.settings["single_char"] = { "language": "Tiếng Việt", "voice": "Tiếng Việt - HoaiMy (Nữ)", "rate": 0, "pitch": 0, "volume": 100, "pause": 500 } async def process_story(self, content, voice, rate, pitch, volume, pause, save_settings): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") voice_dir = f"story_{timestamp}" os.makedirs(voice_dir, exist_ok=True) lines = [line.strip() for line in content.splitlines() if line.strip()] all_subs = [] audio_files = [] for idx, text in enumerate(lines): try: temp_file, subs = await self.generate_speech( text, self.voice_map[voice], rate, pitch, volume ) if temp_file: new_name = f"{voice_dir}/line_{idx+1:03d}.mp3" os.rename(temp_file, new_name) audio_files.append(new_name) # Process subtitles if subs: line_subs = [] for sub in subs: line_subs.append({ "text": sub["text"], "start": sub["start"], "end": sub["end"] }) all_subs.append(line_subs) except Exception as e: print(f"❌ Lỗi dòng {idx+1}: {str(e)}") if not audio_files: return None, None, "❌ Không tạo được file âm thanh" merged_path = self.merge_audio(voice_dir, pause) srt_path = self.generate_full_srt(all_subs, pause, merged_path) if save_settings: self.settings["single_char"] = { "language": next(k for k in TTSConfig.LANGUAGES.keys() if voice.startswith(k)), "voice": voice, "rate": rate, "pitch": pitch, "volume": volume, "pause": pause } self.save_settings() return merged_path, srt_path, "✅ Hoàn thành! Bấm vào nút phát để nghe" def merge_audio(self, voice_dir, pause_duration): files = natsort.natsorted([f for f in os.listdir(voice_dir) if f.startswith("line_")]) merged = AudioSegment.empty() pause = AudioSegment.silent(duration=pause_duration) for file in files: try: audio = AudioSegment.from_file(os.path.join(voice_dir, file)) audio = audio.fade_in(50).fade_out(50) audio = normalize(audio) merged += audio + pause except Exception as e: print(f"❌ Lỗi file {file}: {str(e)}") merged = merged.low_pass_filter(15000) merged = compress_dynamic_range(merged) output_path = os.path.join(voice_dir, "merged_story.mp3") merged.export(output_path, format="mp3", bitrate="256k") return output_path def generate_full_srt(self, all_subs, pause_duration, audio_path): """Generate SRT for the full merged audio""" if not any(all_subs): return None vtt = webvtt.WebVTT() current_time = 0 for line_subs in all_subs: for sub in line_subs: start = current_time + sub["start"] end = current_time + sub["end"] vtt.captions.append(webvtt.Caption( self._format_time(start), self._format_time(end), sub["text"] )) # Add pause time after each line current_time += line_subs[-1]["end"] + pause_duration if line_subs else 0 srt_path = audio_path.replace('.mp3', '.srt') vtt.save(srt_path) return srt_path def generate_story_audio(self, text: str, voice: str, speed: float, device: str, pause_settings: Dict[str, int], volume: float = 1.0, pitch: float = 1.0) -> Tuple[Tuple[int, np.ndarray], str, str]: start_time = time.time() clean_text = self.text_processor.clean_text(text) sentences = self.text_processor.split_sentences(clean_text) if not sentences: return None, "No content to read", "" audio_segments = [] pause_durations = [] # Adjust pause settings based on speed speed_factor = max(0.5, min(2.0, speed)) adjusted_pause_settings = { k: int(v / speed_factor) for k, v in pause_settings.items() } # Generate each audio segment for sentence in sentences: result = self.generate_sentence_audio(sentence, voice, speed, device, volume, pitch) if not result: continue sample_rate, audio_data = result audio_seg = AudioSegment( (audio_data * 32767).astype(np.int16).tobytes(), frame_rate=sample_rate, sample_width=2, channels=1 ) audio_segments.append(audio_seg) # Calculate precise pause duration pause = self.audio_processor.calculate_pause(sentence, adjusted_pause_settings) pause_durations.append(pause) if not audio_segments: return None, "Failed to generate audio", "" # Combine with frame-accurate timing combined_audio = self.audio_processor.combine_segments(audio_segments, pause_durations) # Export with precise timing with io.BytesIO() as buffer: combined_audio.export(buffer, format="mp3", bitrate="256k", parameters=["-ar", str(combined_audio.frame_rate)]) buffer.seek(0) audio_data = np.frombuffer(buffer.read(), dtype=np.uint8) # Generate subtitles with the same timing used for audio subtitles = self.subtitle_generator.generate_srt(audio_segments, sentences, adjusted_pause_settings) stats = (f"Processed {len(clean_text)} chars, {len(clean_text.split())} words\n" f"Audio duration: {len(combined_audio)/1000:.2f}s\n" f"Time: {time.time() - start_time:.2f}s\n" f"Device: {device.upper()}") return (combined_audio.frame_rate, audio_data), stats, subtitles # ==================== TAB 2: MULTI CHARACTER ==================== class MultiCharacterTTSProcessor(BaseTTSProcessor): def __init__(self): super().__init__() if not self.settings.get("multi_char"): self.settings["multi_char"] = { "language_char1": "Tiếng Việt", "voice_char1": "Tiếng Việt - HoaiMy (Nữ)", "language_char2": "Tiếng Việt", "voice_char2": "Tiếng Việt - NamMinh (Nam)", "language_char3": "Tiếng Việt", "voice_char3": "Tiếng Việt - HoaiMy (Nữ)", "rate_char1": -20, "pitch_char1": 0, "volume_char1": 100, "rate_char2": -25, "pitch_char2": 0, "volume_char2": 100, "rate_char3": -15, "pitch_char3": 0, "volume_char3": 100, "repeat_times": 1, "pause_between": 500 } def parse_story(self, content): dialogues = [] for line in content.splitlines(): line = line.strip() if not line: continue if line.upper().startswith("CHAR1:"): dialogues.append(("CHAR1", line[6:].strip())) elif line.upper().startswith("CHAR2:"): dialogues.append(("CHAR2", line[6:].strip())) elif line.upper().startswith("CHAR3:"): dialogues.append(("CHAR3", line[6:].strip())) elif line.upper().startswith("NARRATOR:"): dialogues.append(("NARRATOR", line[9:].strip())) else: if dialogues: last_char, last_text = dialogues[-1] dialogues[-1] = (last_char, f"{last_text} {line}") return dialogues async def process_story(self, content, output_format, char1_voice, char2_voice, char3_voice, char1_rate, char2_rate, char3_rate, char1_pitch, char2_pitch, char3_pitch, char1_volume, char2_volume, char3_volume, repeat_times, pause_between, save_settings): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") voice_dir = f"story_{timestamp}" os.makedirs(voice_dir, exist_ok=True) dialogues = self.parse_story(content) all_subs = [] audio_files = [] for idx, (character, text) in enumerate(dialogues): file_prefix = f"{idx+1:03d}" if character == "CHAR1": voice_id = self.voice_map[char1_voice] rate = char1_rate pitch = char1_pitch volume = char1_volume file_name = f"{file_prefix}_CHAR1.{output_format.lower()}" elif character == "CHAR2": voice_id = self.voice_map[char2_voice] rate = char2_rate pitch = char2_pitch volume = char2_volume file_name = f"{file_prefix}_CHAR2.{output_format.lower()}" elif character == "CHAR3": voice_id = self.voice_map[char3_voice] rate = char3_rate pitch = char3_pitch volume = char3_volume file_name = f"{file_prefix}_CHAR3.{output_format.lower()}" else: # NARRATOR voice_id = self.voice_map[char1_voice] rate = char1_rate pitch = char1_pitch volume = char1_volume file_name = f"{file_prefix}_NARRATOR.{output_format.lower()}" try: temp_file, subs = await self.generate_speech(text, voice_id, rate, pitch, volume) if temp_file: new_path = os.path.join(voice_dir, file_name) os.rename(temp_file, new_path) audio_files.append(new_path) if subs: char_subs = [] for sub in subs: char_subs.append({ "text": f"{character}: {sub['text']}", "start": sub["start"], "end": sub["end"] }) all_subs.append(char_subs) except Exception as e: print(f"❌ Lỗi khi tạo giọng nói cho đoạn {idx+1}: {str(e)}") if not audio_files: return None, None, "❌ Không tạo được file âm thanh" merged_path = self.merge_story(voice_dir, output_format, repeat_times, pause_between) srt_path = self.generate_full_srt(all_subs, pause_between, merged_path, repeat_times) if save_settings: self.settings["multi_char"] = { "language_char1": next(k for k in TTSConfig.LANGUAGES.keys() if char1_voice.startswith(k)), "voice_char1": char1_voice, "language_char2": next(k for k in TTSConfig.LANGUAGES.keys() if char2_voice.startswith(k)), "voice_char2": char2_voice, "language_char3": next(k for k in TTSConfig.LANGUAGES.keys() if char3_voice.startswith(k)), "voice_char3": char3_voice, "rate_char1": char1_rate, "pitch_char1": char1_pitch, "volume_char1": char1_volume, "rate_char2": char2_rate, "pitch_char2": char2_pitch, "volume_char2": char2_volume, "rate_char3": char3_rate, "pitch_char3": char3_pitch, "volume_char3": char3_volume, "repeat_times": repeat_times, "pause_between": pause_between } self.save_settings() return merged_path, srt_path, "✅ Hoàn thành! Bấm vào nút phát để nghe" def merge_story(self, voice_dir, fmt, repeat_count, pause_between): all_files = sorted( [f for f in os.listdir(voice_dir) if f.endswith(f".{fmt.lower()}")], key=lambda x: int(x.split('_')[0]) ) merged = AudioSegment.empty() pause = AudioSegment.silent(duration=pause_between) for file in all_files: try: audio = AudioSegment.from_file(os.path.join(voice_dir, file)) audio = audio.fade_in(50).fade_out(50) for _ in range(repeat_count): merged += normalize(audio) merged += pause except Exception as e: print(f"❌ Lỗi khi xử lý file {file}: {str(e)}") return None merged = merged.low_pass_filter(15000) merged = compress_dynamic_range(merged) output_path = os.path.join(voice_dir, f"story_merged.{fmt.lower()}") merged.export(output_path, format=fmt.lower(), bitrate="256k") return output_path def generate_full_srt(self, all_subs, pause_between, audio_path, repeat_times): """Generate SRT for the full merged audio with character markers""" if not any(all_subs): return None vtt = webvtt.WebVTT() current_time = 0 for _ in range(repeat_times): for line_subs in all_subs: for sub in line_subs: start = current_time + sub["start"] end = current_time + sub["end"] vtt.captions.append(webvtt.Caption( self._format_time(start), self._format_time(end), sub["text"] )) current_time += (line_subs[-1]["end"] if line_subs else 0) + pause_between srt_path = audio_path.replace('.mp3', '.srt') vtt.save(srt_path) return srt_path # ==================== TAB 3: Q&A DIALOGUE ==================== class DialogueTTSProcessor(BaseTTSProcessor): def __init__(self): super().__init__() if not self.settings.get("dialogue"): self.settings["dialogue"] = { "language_q": "Tiếng Việt", "voice_q": "Tiếng Việt - HoaiMy (Nữ)", "language_a": "Tiếng Việt", "voice_a": "Tiếng Việt - NamMinh (Nam)", "rate_q": -20, "pitch_q": 0, "volume_q": 100, "rate_a": -25, "pitch_a": 0, "volume_a": 100, "repeat_times": 2, "pause_q": 200, "pause_a": 500 } def parse_dialogues(self, content): dialogues = [] current_speaker = None current_text = [] for line in content.splitlines(): line = line.strip() if not line: continue if line.upper().startswith(("Q:", "A:")): if current_speaker is not None: dialogues.append((current_speaker, " ".join(current_text))) parts = line.split(":", 1) current_speaker = parts[0].upper() current_text = [parts[1].strip()] if len(parts) > 1 else [""] else: current_text.append(line) if current_speaker is not None: dialogues.append((current_speaker, " ".join(current_text))) return dialogues async def process_dialogues(self, content, output_format, language_q, voice_q, rate_q, pitch_q, volume_q, language_a, voice_a, rate_a, pitch_a, volume_a, repeat_times, pause_q, pause_a, save_settings): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") voice_dir = f"dialogues_{timestamp}" os.makedirs(voice_dir, exist_ok=True) dialogues = self.parse_dialogues(content) all_subs = [] audio_files = [] for idx, (speaker, text) in enumerate(dialogues): voice_id = self.voice_map[voice_q if speaker == "Q" else voice_a] rate = rate_q if speaker == "Q" else rate_a pitch = pitch_q if speaker == "Q" else pitch_a volume = volume_q if speaker == "Q" else volume_a try: temp_file, subs = await self.generate_speech(text, voice_id, rate, pitch, volume) if temp_file: prefix = speaker new_name = f"{voice_dir}/{prefix}_{idx+1:03d}.{output_format.lower()}" os.rename(temp_file, new_name) audio_files.append(new_name) if subs: speaker_subs = [] for sub in subs: speaker_subs.append({ "text": f"{speaker}: {sub['text']}", "start": sub["start"], "end": sub["end"] }) all_subs.append(speaker_subs) except Exception as e: print(f"❌ Error generating speech for line {idx+1}: {str(e)}") if not audio_files: return None, None, "❌ Failed to generate audio files" merged_path = self.merge_with_exact_repetition(voice_dir, output_format, repeat_times, pause_q, pause_a) srt_path = self.generate_full_srt(all_subs, pause_q, pause_a, merged_path, repeat_times) if save_settings: self.settings["dialogue"] = { "language_q": language_q, "voice_q": voice_q, "language_a": language_a, "voice_a": voice_a, "rate_q": rate_q, "pitch_q": pitch_q, "volume_q": volume_q, "rate_a": rate_a, "pitch_a": pitch_a, "volume_a": volume_a, "repeat_times": repeat_times, "pause_q": pause_q, "pause_a": pause_a } self.save_settings() return merged_path, srt_path, "✅ Done! Click play to listen" def merge_with_exact_repetition(self, voice_dir, fmt, repeat_count, pause_q, pause_a): q_files = natsort.natsorted([f for f in os.listdir(voice_dir) if f.startswith("Q_") and f.endswith(f".{fmt.lower()}")]) a_files = natsort.natsorted([f for f in os.listdir(voice_dir) if f.startswith("A_") and f.endswith(f".{fmt.lower()}")]) if len(q_files) != len(a_files): print(f"❌ Mismatched Q ({len(q_files)}) and A ({len(a_files)}) files") return None merged = AudioSegment.empty() short_pause = AudioSegment.silent(duration=pause_q) long_pause = AudioSegment.silent(duration=pause_a) for q_file, a_file in zip(q_files, a_files): try: q_audio = AudioSegment.from_file(os.path.join(voice_dir, q_file)) a_audio = AudioSegment.from_file(os.path.join(voice_dir, a_file)) q_audio = q_audio.fade_in(50).fade_out(50) a_audio = a_audio.fade_in(50).fade_out(50) q_audio = normalize(q_audio) a_audio = normalize(a_audio) for _ in range(repeat_count): merged += q_audio merged += short_pause merged += a_audio merged += long_pause except Exception as e: print(f"❌ Error processing {q_file} or {a_file}: {str(e)}") return None merged = normalize(merged) merged = compress_dynamic_range(merged, threshold=-20.0, ratio=4.0) output_path = os.path.join(voice_dir, f"merged_repeat_{repeat_count}x.{fmt.lower()}") merged.export(output_path, format=fmt.lower(), bitrate="256k") return output_path def generate_full_srt(self, all_subs, pause_q, pause_a, audio_path, repeat_times): """Generate SRT for Q&A with exact repetition""" if not any(all_subs): return None vtt = webvtt.WebVTT() current_time = 0 for _ in range(repeat_times): for i in range(0, len(all_subs), 2): # Process Q q_subs = all_subs[i] if i < len(all_subs) else [] for sub in q_subs: start = current_time + sub["start"] end = current_time + sub["end"] vtt.captions.append(webvtt.Caption( self._format_time(start), self._format_time(end), sub["text"] )) current_time += (q_subs[-1]["end"] if q_subs else 0) + pause_q # Process A a_subs = all_subs[i+1] if i+1 < len(all_subs) else [] for sub in a_subs: start = current_time + sub["start"] end = current_time + sub["end"] vtt.captions.append(webvtt.Caption( self._format_time(start), self._format_time(end), sub["text"] )) current_time += (a_subs[-1]["end"] if a_subs else 0) + pause_a srt_path = audio_path.replace('.mp3', '.srt') vtt.save(srt_path) return srt_path # ==================== GRADIO INTERFACE ==================== def update_voice_dropdown(language, tab_name, char_num=None): processor = BaseTTSProcessor() voice_options = [v for v in processor.voice_map.keys() if v.startswith(language)] default_voice = voice_options[0] if voice_options else None if tab_name == "single": return gr.Dropdown(choices=voice_options, value=default_voice) elif tab_name == "multi": if char_num == 1: return gr.Dropdown(choices=voice_options, value=default_voice) elif char_num == 2: return gr.Dropdown(choices=voice_options, value=default_voice) elif char_num == 3: return gr.Dropdown(choices=voice_options, value=default_voice) elif tab_name == "dialogue": if char_num == "q": return gr.Dropdown(choices=voice_options, value=default_voice) elif char_num == "a": return gr.Dropdown(choices=voice_options, value=default_voice) def toggle_srt_download(audio_path, message): if audio_path and os.path.exists(audio_path.replace('.mp3', '.srt')): return gr.Button(visible=True), gr.Button(visible=True) return gr.Button(visible=False), gr.Button(visible=False) def show_subtitles(audio_output): """Xử lý mọi trường hợp đầu vào không hợp lệ""" # Nếu là số nguyên (sample rate), bỏ qua if isinstance(audio_output, (int, float)): return "⏳ Đang xử lý audio..." # Xử lý các trường hợp còn lại như trước if audio_output is None: return "⏳ Chưa có audio được tạo" if isinstance(audio_output, (tuple, list)) and len(audio_output) > 0: audio_path = audio_output[0] elif isinstance(audio_output, str): audio_path = audio_output else: return "⚠️ Định dạng đầu vào không hỗ trợ" if not isinstance(audio_path, str) or not audio_path.endswith('.mp3'): return f"⚠️ Đường dẫn audio không hợp lệ: {audio_path}" srt_path = audio_path.replace('.mp3', '.srt') if not os.path.exists(srt_path): return "⚠️ Không tìm thấy file phụ đề" try: with open(srt_path, 'r', encoding='utf-8') as f: return f.read() except Exception as e: return f"❌ Lỗi đọc phụ đề: {str(e)}" def toggle_srt_display(audio_path): if not audio_path: return gr.Button(visible=False), gr.Textbox(visible=False) srt_path = audio_path.replace('.mp3', '.srt') if os.path.exists(srt_path): return gr.Button(visible=True), gr.Textbox(visible=True) return gr.Button(visible=False), gr.Textbox(visible=False) def load_subtitles(audio_path): if not audio_path: return "" srt_path = audio_path.replace('.mp3', '.srt') try: with open(srt_path, 'r', encoding='utf-8') as f: return f.read() except: return "Không thể đọc file phụ đề" with gr.Blocks(title="TTS Story Generator") as app: gr.Markdown("