|
|
from moviepy.editor import * |
|
|
from moviepy.video.fx.all import speedx |
|
|
from PIL import Image |
|
|
import pytesseract |
|
|
import numpy as np |
|
|
import edge_tts |
|
|
from mutagen.mp3 import MP3 |
|
|
import uuid |
|
|
import os |
|
|
from pathlib import Path |
|
|
import rust_highlight |
|
|
import rust_combiner |
|
|
import shutil |
|
|
import asyncio |
|
|
import cv2 |
|
|
import numpy as np |
|
|
import subprocess, shlex, os, time |
|
|
import asyncio |
|
|
import nest_asyncio |
|
|
import edge_tts |
|
|
import re |
|
|
import html |
|
|
import unicodedata |
|
|
from pydub import AudioSegment |
|
|
from pydub.effects import normalize |
|
|
import tempfile |
|
|
import os |
|
|
import warnings |
|
|
|
|
|
import math |
|
|
|
|
|
BASE_DIR = "/app/data" |
|
|
IMAGE_DIR = "/tmp/images" |
|
|
os.makedirs(IMAGE_DIR, exist_ok=True) |
|
|
AUDIO_DIR = os.path.join(BASE_DIR, "sound") |
|
|
CLIPS_DIR = os.path.join(BASE_DIR, "video") |
|
|
|
|
|
for path in [BASE_DIR, AUDIO_DIR, CLIPS_DIR]: |
|
|
Path(path).mkdir(parents=True, exist_ok=True) |
|
|
warnings.filterwarnings('ignore') |
|
|
nest_asyncio.apply() |
|
|
|
|
|
import re |
|
|
import html |
|
|
import unicodedata |
|
|
import tempfile |
|
|
import os |
|
|
import asyncio |
|
|
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor |
|
|
from functools import lru_cache |
|
|
import edge_tts |
|
|
from pydub import AudioSegment |
|
|
from pydub.effects import normalize |
|
|
from mutagen.mp3 import MP3 |
|
|
|
|
|
VOICE_EN = "en-IN-NeerjaNeural" |
|
|
|
|
|
|
|
|
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+') |
|
|
TAG_PATTERN = re.compile(r'<[^>]*>|[<>]') |
|
|
BRACKET_PATTERN = re.compile(r'[\{\}\[\]]') |
|
|
SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]') |
|
|
WHITESPACE_PATTERN = re.compile(r'\s+') |
|
|
SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+') |
|
|
SUB_PATTERN = re.compile(r'(?<=[,;:])\s+') |
|
|
|
|
|
@lru_cache(maxsize=1024) |
|
|
def clean_text_for_tts(text): |
|
|
"""Cleans text before TTS with optimized regex and caching.""" |
|
|
if not text: |
|
|
return "" |
|
|
text = str(text).strip() |
|
|
text = html.unescape(text) |
|
|
|
|
|
|
|
|
text = URL_PATTERN.sub('', text) |
|
|
text = TAG_PATTERN.sub('', text) |
|
|
text = BRACKET_PATTERN.sub('', text) |
|
|
text = SPECIAL_CHAR_PATTERN.sub('', text) |
|
|
text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ') |
|
|
|
|
|
|
|
|
for keyword in ['voice', 'speak', 'prosody', 'ssml', 'xmlns']: |
|
|
text = text.replace(keyword, '').replace(keyword.upper(), '') |
|
|
|
|
|
text = unicodedata.normalize('NFKD', text) |
|
|
text = WHITESPACE_PATTERN.sub(' ', text) |
|
|
return text.strip() |
|
|
|
|
|
async def generate_safe_audio(text, voice, semaphore): |
|
|
"""Generate clean audio with rate limiting.""" |
|
|
async with semaphore: |
|
|
cleaned_text = clean_text_for_tts(text) |
|
|
if not cleaned_text: |
|
|
return None |
|
|
|
|
|
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') |
|
|
fname = temp_file.name |
|
|
temp_file.close() |
|
|
|
|
|
try: |
|
|
comm = edge_tts.Communicate(cleaned_text, voice=voice) |
|
|
await comm.save(fname) |
|
|
return fname |
|
|
except Exception as e: |
|
|
print(f"Error generating audio: {e}") |
|
|
if os.path.exists(fname): |
|
|
os.unlink(fname) |
|
|
return None |
|
|
|
|
|
@lru_cache(maxsize=256) |
|
|
def smart_text_chunking(text, max_chars=80): |
|
|
"""Cached text chunking for speed.""" |
|
|
text = clean_text_for_tts(text) |
|
|
if not text: |
|
|
return tuple() |
|
|
|
|
|
sentences = SENTENCE_PATTERN.split(text) |
|
|
chunks = [] |
|
|
|
|
|
for sentence in sentences: |
|
|
sentence = sentence.strip() |
|
|
if not sentence: |
|
|
continue |
|
|
|
|
|
if len(sentence) <= max_chars: |
|
|
chunks.append(sentence) |
|
|
else: |
|
|
sub_parts = SUB_PATTERN.split(sentence) |
|
|
for part in sub_parts: |
|
|
part = part.strip() |
|
|
if not part: |
|
|
continue |
|
|
|
|
|
if len(part) <= max_chars: |
|
|
chunks.append(part) |
|
|
else: |
|
|
words = part.split() |
|
|
current_chunk = "" |
|
|
for word in words: |
|
|
test_chunk = f"{current_chunk} {word}" if current_chunk else word |
|
|
if len(test_chunk) <= max_chars: |
|
|
current_chunk = test_chunk |
|
|
else: |
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
current_chunk = word |
|
|
if current_chunk: |
|
|
chunks.append(current_chunk.strip()) |
|
|
|
|
|
return tuple(chunk for chunk in chunks if chunk.strip()) |
|
|
|
|
|
def process_audio_segment_fast(audio_file): |
|
|
"""Fast audio processing in separate thread.""" |
|
|
try: |
|
|
segment = AudioSegment.from_file(audio_file) |
|
|
segment = normalize(segment) |
|
|
|
|
|
|
|
|
if len(segment) > 200: |
|
|
try: |
|
|
segment = segment.strip_silence(silence_len=50, silence_thresh=-40) |
|
|
except: |
|
|
pass |
|
|
|
|
|
return segment |
|
|
except Exception as e: |
|
|
print(f"Warning: Error processing audio segment: {e}") |
|
|
return None |
|
|
finally: |
|
|
|
|
|
try: |
|
|
if os.path.exists(audio_file): |
|
|
os.unlink(audio_file) |
|
|
except: |
|
|
pass |
|
|
|
|
|
async def bilingual_tts_optimized(text, output_file="audio0.mp3", VOICE_TA=None, max_concurrent=10): |
|
|
"""Ultra-optimized bilingual TTS with parallel processing.""" |
|
|
print("Starting optimized bilingual TTS processing...") |
|
|
|
|
|
try: |
|
|
chunks = smart_text_chunking(text) |
|
|
if not chunks: |
|
|
print("Error: No valid text chunks after cleaning") |
|
|
return None |
|
|
|
|
|
print(f"Processing {len(chunks)} text chunks with max {max_concurrent} concurrent requests...") |
|
|
|
|
|
is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA |
|
|
|
|
|
|
|
|
semaphore = asyncio.Semaphore(max_concurrent) |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for i, chunk in enumerate(chunks): |
|
|
is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk) |
|
|
voice = VOICE_TA if (is_bilingual_tamil and is_tamil) else (VOICE_TA or VOICE_EN) |
|
|
tasks.append(generate_safe_audio(chunk, voice, semaphore)) |
|
|
|
|
|
|
|
|
audio_files = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
processed_audio_files = [f for f in audio_files if isinstance(f, str) and f] |
|
|
|
|
|
if not processed_audio_files: |
|
|
print("Error: No audio was successfully generated") |
|
|
return None |
|
|
|
|
|
print(f"Successfully generated {len(processed_audio_files)} audio segments") |
|
|
|
|
|
|
|
|
with ThreadPoolExecutor(max_workers=min(len(processed_audio_files), 8)) as executor: |
|
|
audio_segments = list(executor.map(process_audio_segment_fast, processed_audio_files)) |
|
|
|
|
|
|
|
|
audio_segments = [seg for seg in audio_segments if seg is not None] |
|
|
|
|
|
if not audio_segments: |
|
|
print("Error: No audio segments were successfully processed") |
|
|
return None |
|
|
|
|
|
|
|
|
print("Merging audio segments...") |
|
|
merged_audio = audio_segments[0] |
|
|
pause = AudioSegment.silent(duration=200) |
|
|
|
|
|
for segment in audio_segments[1:]: |
|
|
merged_audio += pause + segment |
|
|
|
|
|
|
|
|
print("Applying final audio processing...") |
|
|
merged_audio = merged_audio.compress_dynamic_range( |
|
|
threshold=-20.0, |
|
|
ratio=4.0, |
|
|
attack=5.0, |
|
|
release=50.0 |
|
|
) |
|
|
merged_audio = normalize(merged_audio) |
|
|
|
|
|
|
|
|
merged_audio.export(output_file, format="mp3", bitrate="192k") |
|
|
print(f"✅ Audio successfully generated: {output_file}") |
|
|
|
|
|
return output_file |
|
|
|
|
|
except Exception as main_error: |
|
|
print(f"Main error in bilingual TTS: {main_error}") |
|
|
return None |
|
|
|
|
|
async def generate_tts_optimized(id, lines, lang): |
|
|
"""Optimized TTS generation function.""" |
|
|
voice = { |
|
|
"English": "en-US-JennyNeural", |
|
|
"Tamil": "ta-IN-PallaviNeural", |
|
|
"Hindi": "hi-IN-SwaraNeural", |
|
|
"Malayalam": "ml-IN-SobhanaNeural", |
|
|
"Kannada": "kn-IN-SapnaNeural", |
|
|
"Telugu": "te-IN-ShrutiNeural", |
|
|
"Bengali": "bn-IN-TanishaaNeural", |
|
|
"Marathi": "mr-IN-AarohiNeural", |
|
|
"Gujarati": "gu-IN-DhwaniNeural", |
|
|
"Punjabi": "pa-IN-VaaniNeural", |
|
|
"Urdu": "ur-IN-GulNeural", |
|
|
"French": "fr-FR-DeniseNeural", |
|
|
"German": "de-DE-KatjaNeural", |
|
|
"Spanish": "es-ES-ElviraNeural", |
|
|
"Italian": "it-IT-IsabellaNeural", |
|
|
"Russian": "ru-RU-SvetlanaNeural", |
|
|
"Japanese": "ja-JP-NanamiNeural", |
|
|
"Korean": "ko-KR-SunHiNeural", |
|
|
"Chinese": "zh-CN-XiaoxiaoNeural", |
|
|
"Arabic": "ar-SA-ZariyahNeural", |
|
|
"Portuguese": "pt-BR-FranciscaNeural", |
|
|
"Dutch": "nl-NL-FennaNeural", |
|
|
"Greek": "el-GR-AthinaNeural", |
|
|
"Hebrew": "he-IL-HilaNeural", |
|
|
"Turkish": "tr-TR-EmelNeural", |
|
|
"Polish": "pl-PL-AgnieszkaNeural", |
|
|
"Thai": "th-TH-AcharaNeural", |
|
|
"Vietnamese": "vi-VN-HoaiMyNeural", |
|
|
"Swedish": "sv-SE-SofieNeural", |
|
|
"Finnish": "fi-FI-NooraNeural", |
|
|
"Czech": "cs-CZ-VlastaNeural", |
|
|
"Hungarian": "hu-HU-NoemiNeural" |
|
|
} |
|
|
|
|
|
audio_name = f"audio{id}.mp3" |
|
|
audio_path = os.path.join(AUDIO_DIR, audio_name) |
|
|
|
|
|
if "&&&" in lang: |
|
|
listf = lang.split("&&&") |
|
|
text = listf[0].strip() |
|
|
lang_name = listf[1].strip() |
|
|
voice_to_use = voice.get(lang_name, VOICE_EN) |
|
|
else: |
|
|
text = lines[id] |
|
|
voice_to_use = voice.get(lang, VOICE_EN) |
|
|
|
|
|
|
|
|
output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=15) |
|
|
|
|
|
if output and os.path.exists(audio_path): |
|
|
audio = MP3(audio_path) |
|
|
duration = audio.info.length |
|
|
return duration, audio_path |
|
|
|
|
|
return None, None |
|
|
|
|
|
def audio_func(id, lines, lang): |
|
|
"""Synchronous wrapper for audio generation.""" |
|
|
return asyncio.run(generate_tts_optimized(id, lines, lang)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import subprocess |
|
|
import shlex |
|
|
import time |
|
|
import math |
|
|
import numpy as np |
|
|
import cv2 |
|
|
from moviepy.editor import VideoFileClip, AudioFileClip |
|
|
from moviepy.video.fx.speedx import speedx |
|
|
|
|
|
|
|
|
def video_func(id, lines, lang): |
|
|
duration, audio_path = audio_func(id, lines, lang) |
|
|
if not duration or not audio_path: |
|
|
print("Failed to generate audio.") |
|
|
return None |
|
|
|
|
|
TEXT = lines[id] |
|
|
print("-----------------------------------------------------------------------------") |
|
|
print(TEXT) |
|
|
|
|
|
|
|
|
os.makedirs(CLIPS_DIR, exist_ok=True) |
|
|
|
|
|
|
|
|
final_video_path = rust_highlight.generate_video_clip(id, TEXT, audio_path, duration, CLIPS_DIR) |
|
|
|
|
|
if final_video_path: |
|
|
print(f"Final video saved at: {final_video_path}") |
|
|
return final_video_path |
|
|
else: |
|
|
print("Video generation failed.") |
|
|
return None |