backendprocesssuper / video2.py
sreepathi-ravikumar's picture
Update video2.py
70d5824 verified
from moviepy.editor import *
from moviepy.video.fx.all import speedx
from PIL import Image
import pytesseract
import numpy as np
import edge_tts
from mutagen.mp3 import MP3
import uuid
import os
from pathlib import Path
import rust_highlight
import rust_combiner
import shutil
import asyncio
import cv2
import numpy as np
import subprocess, shlex, os, time
import asyncio
import nest_asyncio
import edge_tts
import re
import html
import unicodedata
from pydub import AudioSegment
from pydub.effects import normalize
import tempfile
import os
import warnings
# from IPython.display import Video, display, HTML # Commented out for Hugging Face Spaces compatibility
import math
# Use /app/data which we created with proper permissions
BASE_DIR = "/app/data"
IMAGE_DIR = "/tmp/images"
os.makedirs(IMAGE_DIR, exist_ok=True)
AUDIO_DIR = os.path.join(BASE_DIR, "sound")
CLIPS_DIR = os.path.join(BASE_DIR, "video")
# Create directories (no chmod needed)
for path in [BASE_DIR, AUDIO_DIR, CLIPS_DIR]:
Path(path).mkdir(parents=True, exist_ok=True)
warnings.filterwarnings('ignore')
nest_asyncio.apply()
import re
import html
import unicodedata
import tempfile
import os
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
from functools import lru_cache
import edge_tts
from pydub import AudioSegment
from pydub.effects import normalize
from mutagen.mp3 import MP3
VOICE_EN = "en-IN-NeerjaNeural"
# Pre-compiled regex patterns for speed (compiled once, reused many times)
URL_PATTERN = re.compile(r'https?://[^\s<>"\']+|www\.[^\s<>"\']+')
TAG_PATTERN = re.compile(r'<[^>]*>|[<>]')
BRACKET_PATTERN = re.compile(r'[\{\}\[\]]')
SPECIAL_CHAR_PATTERN = re.compile(r'[#@$%^&*_+=|\\`~]')
WHITESPACE_PATTERN = re.compile(r'\s+')
SENTENCE_PATTERN = re.compile(r'(?<=[.!?])\s+')
SUB_PATTERN = re.compile(r'(?<=[,;:])\s+')
@lru_cache(maxsize=1024) # Cache cleaned text to avoid re-processing
def clean_text_for_tts(text):
"""Cleans text before TTS with optimized regex and caching."""
if not text:
return ""
text = str(text).strip()
text = html.unescape(text)
# Use pre-compiled patterns (much faster)
text = URL_PATTERN.sub('', text)
text = TAG_PATTERN.sub('', text)
text = BRACKET_PATTERN.sub('', text)
text = SPECIAL_CHAR_PATTERN.sub('', text)
text = text.replace('\\n', ' ').replace('\\t', ' ').replace('\\r', ' ')
# Batch remove keywords (faster than multiple re.sub calls)
for keyword in ['voice', 'speak', 'prosody', 'ssml', 'xmlns']:
text = text.replace(keyword, '').replace(keyword.upper(), '')
text = unicodedata.normalize('NFKD', text)
text = WHITESPACE_PATTERN.sub(' ', text)
return text.strip()
async def generate_safe_audio(text, voice, semaphore):
"""Generate clean audio with rate limiting."""
async with semaphore: # Limit concurrent TTS requests
cleaned_text = clean_text_for_tts(text)
if not cleaned_text:
return None
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
fname = temp_file.name
temp_file.close()
try:
comm = edge_tts.Communicate(cleaned_text, voice=voice)
await comm.save(fname)
return fname
except Exception as e:
print(f"Error generating audio: {e}")
if os.path.exists(fname):
os.unlink(fname)
return None
@lru_cache(maxsize=256)
def smart_text_chunking(text, max_chars=80):
"""Cached text chunking for speed."""
text = clean_text_for_tts(text)
if not text:
return tuple() # Return tuple for hashability (required by lru_cache)
sentences = SENTENCE_PATTERN.split(text)
chunks = []
for sentence in sentences:
sentence = sentence.strip()
if not sentence:
continue
if len(sentence) <= max_chars:
chunks.append(sentence)
else:
sub_parts = SUB_PATTERN.split(sentence)
for part in sub_parts:
part = part.strip()
if not part:
continue
if len(part) <= max_chars:
chunks.append(part)
else:
words = part.split()
current_chunk = ""
for word in words:
test_chunk = f"{current_chunk} {word}" if current_chunk else word
if len(test_chunk) <= max_chars:
current_chunk = test_chunk
else:
if current_chunk:
chunks.append(current_chunk.strip())
current_chunk = word
if current_chunk:
chunks.append(current_chunk.strip())
return tuple(chunk for chunk in chunks if chunk.strip())
def process_audio_segment_fast(audio_file):
"""Fast audio processing in separate thread."""
try:
segment = AudioSegment.from_file(audio_file)
segment = normalize(segment)
# Only strip silence for longer segments
if len(segment) > 200:
try:
segment = segment.strip_silence(silence_len=50, silence_thresh=-40)
except:
pass # Skip if fails
return segment
except Exception as e:
print(f"Warning: Error processing audio segment: {e}")
return None
finally:
# Cleanup temp file immediately
try:
if os.path.exists(audio_file):
os.unlink(audio_file)
except:
pass
async def bilingual_tts_optimized(text, output_file="audio0.mp3", VOICE_TA=None, max_concurrent=10):
"""Ultra-optimized bilingual TTS with parallel processing."""
print("Starting optimized bilingual TTS processing...")
try:
chunks = smart_text_chunking(text)
if not chunks:
print("Error: No valid text chunks after cleaning")
return None
print(f"Processing {len(chunks)} text chunks with max {max_concurrent} concurrent requests...")
is_bilingual_tamil = VOICE_TA is not None and "ta-IN" in VOICE_TA
# Semaphore to limit concurrent TTS requests (prevents rate limiting)
semaphore = asyncio.Semaphore(max_concurrent)
# Prepare all tasks
tasks = []
for i, chunk in enumerate(chunks):
is_tamil = any('\u0B80' <= char <= '\u0BFF' for char in chunk)
voice = VOICE_TA if (is_bilingual_tamil and is_tamil) else (VOICE_TA or VOICE_EN)
tasks.append(generate_safe_audio(chunk, voice, semaphore))
# Generate all audio files concurrently
audio_files = await asyncio.gather(*tasks, return_exceptions=True)
# Filter successful files
processed_audio_files = [f for f in audio_files if isinstance(f, str) and f]
if not processed_audio_files:
print("Error: No audio was successfully generated")
return None
print(f"Successfully generated {len(processed_audio_files)} audio segments")
# Process audio segments in parallel using ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=min(len(processed_audio_files), 8)) as executor:
audio_segments = list(executor.map(process_audio_segment_fast, processed_audio_files))
# Filter out None segments
audio_segments = [seg for seg in audio_segments if seg is not None]
if not audio_segments:
print("Error: No audio segments were successfully processed")
return None
# Merge audio segments (fast concatenation)
print("Merging audio segments...")
merged_audio = audio_segments[0]
pause = AudioSegment.silent(duration=200)
for segment in audio_segments[1:]:
merged_audio += pause + segment
# Apply final processing (compression and normalization)
print("Applying final audio processing...")
merged_audio = merged_audio.compress_dynamic_range(
threshold=-20.0,
ratio=4.0,
attack=5.0,
release=50.0
)
merged_audio = normalize(merged_audio)
# Export with high quality
merged_audio.export(output_file, format="mp3", bitrate="192k")
print(f"✅ Audio successfully generated: {output_file}")
return output_file
except Exception as main_error:
print(f"Main error in bilingual TTS: {main_error}")
return None
async def generate_tts_optimized(id, lines, lang):
"""Optimized TTS generation function."""
voice = {
"English": "en-US-JennyNeural",
"Tamil": "ta-IN-PallaviNeural",
"Hindi": "hi-IN-SwaraNeural",
"Malayalam": "ml-IN-SobhanaNeural",
"Kannada": "kn-IN-SapnaNeural",
"Telugu": "te-IN-ShrutiNeural",
"Bengali": "bn-IN-TanishaaNeural",
"Marathi": "mr-IN-AarohiNeural",
"Gujarati": "gu-IN-DhwaniNeural",
"Punjabi": "pa-IN-VaaniNeural",
"Urdu": "ur-IN-GulNeural",
"French": "fr-FR-DeniseNeural",
"German": "de-DE-KatjaNeural",
"Spanish": "es-ES-ElviraNeural",
"Italian": "it-IT-IsabellaNeural",
"Russian": "ru-RU-SvetlanaNeural",
"Japanese": "ja-JP-NanamiNeural",
"Korean": "ko-KR-SunHiNeural",
"Chinese": "zh-CN-XiaoxiaoNeural",
"Arabic": "ar-SA-ZariyahNeural",
"Portuguese": "pt-BR-FranciscaNeural",
"Dutch": "nl-NL-FennaNeural",
"Greek": "el-GR-AthinaNeural",
"Hebrew": "he-IL-HilaNeural",
"Turkish": "tr-TR-EmelNeural",
"Polish": "pl-PL-AgnieszkaNeural",
"Thai": "th-TH-AcharaNeural",
"Vietnamese": "vi-VN-HoaiMyNeural",
"Swedish": "sv-SE-SofieNeural",
"Finnish": "fi-FI-NooraNeural",
"Czech": "cs-CZ-VlastaNeural",
"Hungarian": "hu-HU-NoemiNeural"
}
audio_name = f"audio{id}.mp3"
audio_path = os.path.join(AUDIO_DIR, audio_name)
if "&&&" in lang:
listf = lang.split("&&&")
text = listf[0].strip()
lang_name = listf[1].strip()
voice_to_use = voice.get(lang_name, VOICE_EN)
else:
text = lines[id]
voice_to_use = voice.get(lang, VOICE_EN)
# Increase max_concurrent for more speed (adjust based on your system)
output = await bilingual_tts_optimized(text, audio_path, voice_to_use, max_concurrent=15)
if output and os.path.exists(audio_path):
audio = MP3(audio_path)
duration = audio.info.length
return duration, audio_path
return None, None
def audio_func(id, lines, lang):
"""Synchronous wrapper for audio generation."""
return asyncio.run(generate_tts_optimized(id, lines, lang))
#-----------------------------
#---------------------------------
import os
import subprocess
import shlex
import time
import math
import numpy as np
import cv2
from moviepy.editor import VideoFileClip, AudioFileClip
from moviepy.video.fx.speedx import speedx
# video.py
def video_func(id, lines, lang):
duration, audio_path = audio_func(id, lines, lang)
if not duration or not audio_path:
print("Failed to generate audio.")
return None
TEXT = lines[id]
print("-----------------------------------------------------------------------------")
print(TEXT)
# CREATE CLIPS DIRECTORY IF IT DOESN'T EXIST
os.makedirs(CLIPS_DIR, exist_ok=True)
# Call Rust function
final_video_path = rust_highlight.generate_video_clip(id, TEXT, audio_path, duration, CLIPS_DIR)
if final_video_path:
print(f"Final video saved at: {final_video_path}")
return final_video_path
else:
print("Video generation failed.")
return None