import gradio as gr import edge_tts import asyncio import tempfile import os import re import shutil from pydub import AudioSegment import math import time from datetime import datetime, timedelta import logging from text_cleaning import TextCleaner # EPUB parsing try: import ebooklib from ebooklib import epub from bs4 import BeautifulSoup EPUB_SUPPORT = True except ImportError: EPUB_SUPPORT = False logging.warning("ebooklib or beautifulsoup4 not installed. EPUB support disabled.") # Encoding detection try: import chardet CHARDET_SUPPORT = True except ImportError: CHARDET_SUPPORT = False logging.warning("chardet not installed. Encoding detection will use fallback method.") # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def detect_file_encoding(file_path): """Detect file encoding using chardet or fallback method""" if CHARDET_SUPPORT: with open(file_path, 'rb') as f: raw_data = f.read() result = chardet.detect(raw_data) encoding = result['encoding'] confidence = result['confidence'] logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})") # Handle common encoding aliases if encoding: encoding_lower = encoding.lower() # Map common aliases to standard names encoding_map = { 'gb2312': 'gbk', # GBK is superset of GB2312 'gb18030': 'gb18030', 'ascii': 'utf-8', # ASCII is subset of UTF-8 'iso-8859-1': 'latin-1', 'windows-1252': 'cp1252', } encoding = encoding_map.get(encoding_lower, encoding) return encoding else: # Fallback: try common encodings return None def read_text_file_with_encoding(file_path): """Read text file with automatic encoding detection""" # First try chardet detection detected_encoding = detect_file_encoding(file_path) # Priority list of encodings to try # Common encodings for Chinese: UTF-8, GBK, GB2312, GB18030 # Common encodings for English/Western: UTF-8, Latin-1, CP1252 encodings_to_try = [] if detected_encoding: encodings_to_try.append(detected_encoding) # Add common encodings as fallback encodings_to_try.extend([ 'utf-8', 'utf-8-sig', # UTF-8 with BOM 'gbk', # Chinese (simplified) 'gb18030', # Chinese (extended) 'big5', # Chinese (traditional) 'utf-16', 'latin-1', # Western European 'cp1252', # Windows Western 'shift_jis', # Japanese 'euc-kr', # Korean ]) # Remove duplicates while preserving order seen = set() unique_encodings = [] for enc in encodings_to_try: if enc and enc.lower() not in seen: seen.add(enc.lower()) unique_encodings.append(enc) last_error = None for encoding in unique_encodings: try: with open(file_path, 'r', encoding=encoding) as f: text = f.read() # Validate: check if text contains too many replacement characters if text.count('\ufffd') > len(text) * 0.1: # More than 10% replacement chars logger.debug(f"Encoding {encoding} produced too many replacement characters, trying next...") continue logger.info(f"Successfully read file with encoding: {encoding}") return text except (UnicodeDecodeError, LookupError) as e: last_error = e logger.debug(f"Failed to decode with {encoding}: {e}") continue logger.error(f"Failed to decode file with any encoding. Last error: {last_error}") return None def parse_uploaded_file(file_path): """Parse uploaded txt or epub file and return text content and filename""" if file_path is None: return None, None filename = os.path.splitext(os.path.basename(file_path))[0] ext = os.path.splitext(file_path)[1].lower() if ext == '.txt': text = read_text_file_with_encoding(file_path) if text: logger.info(f"Parsed TXT file: {filename}, {len(text)} chars") return text, filename else: logger.error(f"Failed to decode TXT file: {filename}") return None, filename elif ext == '.epub': if not EPUB_SUPPORT: logger.error("EPUB support not available") return None, filename try: book = epub.read_epub(file_path) text_parts = [] for item in book.get_items(): if item.get_type() == ebooklib.ITEM_DOCUMENT: soup = BeautifulSoup(item.get_content(), 'html.parser') text_parts.append(soup.get_text(separator='\n')) text = '\n\n'.join(text_parts) logger.info(f"Parsed EPUB file: {filename}, {len(text)} chars") return text, filename except Exception as e: logger.error(f"Failed to parse EPUB: {e}") return None, filename return None, None async def convert_to_m4b(mp3_path, output_filename): """Convert MP3 to M4B format using ffmpeg directly (supports large files)""" try: import subprocess m4b_path = tempfile.NamedTemporaryFile(delete=False, suffix=".m4b").name # Use ffmpeg directly for conversion (avoids pydub's 4GB limit) cmd = [ 'ffmpeg', '-y', # Overwrite output '-i', mp3_path, # Input file '-c:a', 'aac', # Audio codec '-b:a', '128k', # Audio bitrate '-f', 'ipod', # M4B/M4A format m4b_path ] logger.info(f"Running ffmpeg conversion: {' '.join(cmd)}") result = subprocess.run( cmd, capture_output=True, text=True, timeout=3600 # 1 hour timeout for large files ) if result.returncode != 0: logger.error(f"ffmpeg error: {result.stderr}") if os.path.exists(m4b_path): os.remove(m4b_path) return None logger.info(f"Converted to M4B: {m4b_path}") return m4b_path except FileNotFoundError: logger.error("ffmpeg not found. Please install ffmpeg to use M4B format.") return None except subprocess.TimeoutExpired: logger.error("ffmpeg conversion timed out") return None except Exception as e: logger.error(f"Failed to convert to M4B: {e}") return None async def get_voices(): voices = await edge_tts.list_voices() return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} def format_time_remaining(seconds): """Format seconds into human readable time remaining""" if seconds < 60: return f"{int(seconds)}s" elif seconds < 3600: minutes = seconds / 60 return f"{minutes:.1f}m" else: hours = seconds / 3600 return f"{hours:.1f}h" def calculate_eta(start_time, completed_items, total_items): """Calculate estimated time remaining""" if completed_items == 0: return "Calculating..." elapsed_time = time.time() - start_time time_per_item = elapsed_time / completed_items remaining_items = total_items - completed_items remaining_time = time_per_item * remaining_items return format_time_remaining(remaining_time) def estimate_text_duration(text): """Estimate speech duration in minutes based on text length""" # Simple heuristic: # For English (space-separated), ~150 words/min # For Chinese (no spaces), ~300 chars/min # We'll use a hybrid approach: count spaces to guess if it's space-separated. if not text: return 0 space_count = text.count(' ') total_len = len(text) # If spaces are < 10% of length, assume non-space-separated (like Chinese) if space_count / total_len < 0.1: # Approx 300 chars per minute for Chinese duration = total_len / 300 # logger.debug(f"Estimated duration (char-based): {duration:.2f} min ({total_len} chars)") else: # Approx 150 words per minute for English word_count = len(text.split()) duration = word_count / 150 # logger.debug(f"Estimated duration (word-based): {duration:.2f} min ({word_count} words)") return duration def split_text_by_paragraphs(text, max_duration_minutes=5, max_chars=500): """Split text into segments that won't exceed limit with safety margin""" max_duration = max_duration_minutes estimated_duration = estimate_text_duration(text) logger.info(f"Checking segmentation: Duration={estimated_duration:.2f}m, Chars={len(text)}, Limit={max_duration}m/{max_chars}chars") if estimated_duration <= max_duration and len(text) <= max_chars: return [text] logger.info(f"Text exceeds limits. Splitting...") # Split by paragraphs first paragraphs = text.split('\n\n') segments = [] current_segment = "" for paragraph in paragraphs: paragraph_duration = estimate_text_duration(paragraph) # If single paragraph is too long, split by sentences # Improved regex to include Chinese punctuation if paragraph_duration > max_duration or len(paragraph) > max_chars: sentences = re.split(r'([.!?。!?]+)', paragraph) # Re-attach delimiters to sentences real_sentences = [] for i in range(0, len(sentences) - 1, 2): real_sentences.append(sentences[i] + sentences[i+1]) if len(sentences) % 2 == 1 and sentences[-1]: real_sentences.append(sentences[-1]) for sentence in real_sentences: sentence = sentence.strip() if not sentence: continue # Check both duration and char count if (estimate_text_duration(current_segment + sentence) > max_duration or len(current_segment + sentence) > max_chars) and current_segment: segments.append(current_segment.strip()) current_segment = sentence else: current_segment += sentence else: if (estimate_text_duration(current_segment + paragraph) > max_duration or len(current_segment + paragraph) > max_chars) and current_segment: segments.append(current_segment.strip()) current_segment = paragraph + "\n\n" else: current_segment += paragraph + "\n\n" if current_segment.strip(): segments.append(current_segment.strip()) logger.info(f"Split text into {len(segments)} segments.") return segments import io async def generate_audio_segment(text_segment, voice_short_name, rate_str, volume_str, pitch_str, segment_index): """Generate audio for a single text segment and save to temporary file""" logger.info(f"Generating segment {segment_index}...") communicate = edge_tts.Communicate(text_segment, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str) # Save directly to temporary file instead of memory tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_seg{segment_index}.mp3") tmp_path = tmp_file.name tmp_file.close() try: await communicate.save(tmp_path) except Exception as e: logger.error(f"Error generating segment {segment_index} (Length: {len(text_segment)} chars): {e}") if os.path.exists(tmp_path): os.remove(tmp_path) raise gr.Error(f"Error generating segment {segment_index}: {e}") # Verify segment duration try: seg_audio = AudioSegment.from_mp3(tmp_path) duration_min = len(seg_audio) / 1000 / 60 logger.info(f"Segment {segment_index} saved to temp file (Duration: {duration_min:.2f} min)") except Exception as e: logger.error(f"Error checking segment {segment_index} duration: {e}") return tmp_path async def merge_audio_files(audio_paths): """Merge multiple audio files into one file using binary concatenation""" if not audio_paths: return None logger.info(f"Merging {len(audio_paths)} audio segments...") # Create output file with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: merged_path = tmp_file.name # Binary concatenation of MP3 files (avoids WAV size limit) total_size = 0 with open(merged_path, 'wb') as outfile: for i, audio_path in enumerate(audio_paths): try: with open(audio_path, 'rb') as infile: data = infile.read() outfile.write(data) total_size += len(data) # Delete temporary segment file after merging os.remove(audio_path) logger.info(f"Merged and deleted segment {i+1}") except Exception as e: logger.error(f"Error merging segment {i+1}: {e}") logger.info(f"Merged audio saved to {merged_path} (Total size: {total_size / 1024 / 1024:.2f} MB)") return merged_path async def text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options=None, output_format="mp3", output_filename=None): """Generate speech with detailed progress tracking via generator""" if not text.strip(): yield None, "Please enter text to convert.", None return if not voice: yield None, "Please select a voice.", None return # Apply text cleaning if enabled if cleaning_options and cleaning_options.get('enable_cleaning', False): yield 0, "Cleaning text...", None # original_text = text # Unused text = TextCleaner.clean_text(text, cleaning_options) if cleaning_options.get('save_cleaned', False): # Create a filename based on timestamp or first few words timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") filename = f"text_{timestamp}.txt" saved_path = TextCleaner.save_cleaned_text(text, filename) if saved_path: logger.info(f"Saved cleaned text to {saved_path}") if not text.strip(): yield None, "Text cleaning resulted in empty text.", None return voice_short_name = voice.split(" - ")[0] rate_str = f"{rate:+d}%" volume_str = f"{volume:+d}%" pitch_str = f"{pitch:+d}Hz" # Check if text is too long and needs segmentation estimated_duration = estimate_text_duration(text) yield 0, "Starting text processing...", None logger.info(f"Starting TTS for text with estimated duration: {estimated_duration:.2f}m") # Generate output filename with timestamp timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") if output_filename: final_filename = f"{output_filename}_{timestamp}" else: final_filename = f"audio_{timestamp}" final_audio_path = None if estimated_duration > 15: # If longer than 15 minutes, split into segments segments = split_text_by_paragraphs(text) total_segments = len(segments) segment_info = f"Text split into {total_segments} segments. Total estimated duration: {estimated_duration:.1f} min" yield 5, segment_info, segment_info if total_segments > 1: # Generate audio for each segment with progress tracking audio_objects = [] start_time = time.time() for i, segment in enumerate(segments): if segment.strip(): segment_duration = estimate_text_duration(segment) progress = 10 + (80 * i / total_segments) # 10% to 90% eta = calculate_eta(start_time, i, total_segments) status_msg = ( f"Generating segment {i+1}/{total_segments}...\n" f"Segment duration: {segment_duration:.1f} min\n" f"ETA: {eta}" ) logger.info(f"Progress: {status_msg.replace(chr(10), ', ')}") yield progress, status_msg, segment_info # Generate to memory audio_obj = await generate_audio_segment( segment, voice_short_name, rate_str, volume_str, pitch_str, i+1 ) audio_objects.append(audio_obj) yield 90, "Merging audio files...", segment_info # Merge all audio objects merged_audio_path = await merge_audio_files(audio_objects) final_audio_path = merged_audio_path # Convert to M4B if requested if output_format == "m4b" and merged_audio_path: yield 95, "Converting to M4B format...", segment_info m4b_path = await convert_to_m4b(merged_audio_path, final_filename) if m4b_path: os.remove(merged_audio_path) final_audio_path = m4b_path # Rename to final filename if final_audio_path: ext = ".m4b" if output_format == "m4b" else ".mp3" new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}") shutil.move(final_audio_path, new_path) final_audio_path = new_path yield 100, "Audio generation complete! ✅", segment_info yield final_audio_path, "Done", segment_info return # For short texts or single segment, use original method yield 50, "Generating audio...", None logger.info("Generating single segment audio...") communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str) with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: tmp_path = tmp_file.name await communicate.save(tmp_path) final_audio_path = tmp_path # Convert to M4B if requested if output_format == "m4b": yield 80, "Converting to M4B format...", None m4b_path = await convert_to_m4b(tmp_path, final_filename) if m4b_path: os.remove(tmp_path) final_audio_path = m4b_path # Rename to final filename if final_audio_path: ext = ".m4b" if output_format == "m4b" else ".mp3" new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}") shutil.move(final_audio_path, new_path) final_audio_path = new_path logger.info(f"Audio generated at {final_audio_path}") yield 100, "Audio generation complete! ✅", None yield final_audio_path, "Done", None async def tts_interface(text, uploaded_file, voice, rate, volume, pitch, output_format, enable_cleaning, save_cleaned, clean_urls, clean_html, clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg, del_special, wetext_norm): """Enhanced TTS interface with detailed progress tracking""" # Get output filename from uploaded file (if any) output_filename = None if uploaded_file is not None: output_filename = os.path.splitext(os.path.basename(uploaded_file))[0] logger.info(f"Using filename from uploaded file: {output_filename}") if not text.strip(): yield None, gr.update(visible=True, value="Please enter text or upload a file."), "No text provided", gr.update(visible=False) return if not voice: yield None, gr.update(visible=False), "Please select a voice.", gr.update(visible=False) return # Prepare cleaning options cleaning_options = { 'enable_cleaning': enable_cleaning, 'save_cleaned': save_cleaned, 'remove_urls': clean_urls, 'remove_html': clean_html, 'remove_markdown': clean_markdown, 'filter_ads': clean_ads, 'fix_encoding': fix_enc, 'tidy_whitespace': tidy_ws, 'remove_gutenberg': del_gutenberg, 'remove_special_chars': del_special, 'wetext_normalization': wetext_norm } # We need to clean text here first to estimate duration correctly? # Or let the generator handle it. The generator handles it, but estimation might be off. # Ideally we clean first if enabled, then estimate. working_text = text if enable_cleaning: working_text = TextCleaner.clean_text(text, cleaning_options) if save_cleaned: # We'll let the generator save it to avoid double saving or complex logic here, # but we need to pass the options. pass estimated_duration = estimate_text_duration(working_text) # Reset UI yield None, gr.update(value="Starting...", visible=True), "Initializing...", gr.update(visible=False) async for result in text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options, output_format, output_filename): if isinstance(result, tuple) and len(result) == 3: # Progress update progress_val, status_msg, segment_info = result if isinstance(progress_val, (int, float)): # It's a progress update segment_update = gr.update(value=segment_info, visible=True) if segment_info else gr.update(visible=False) yield None, gr.update(value=status_msg, visible=True), status_msg, segment_update else: # It's the final result (path, msg, info) audio_path = progress_val yield audio_path, gr.update(value="Complete!", visible=True), "Generation Complete", gr.update(visible=True) async def create_demo(): voices = await get_voices() description = """ Convert text to speech using Microsoft Edge TTS. Adjust speech rate and pitch: 0 is default, positive values increase, negative values decrease. 🎥 **Exciting News: Introducing our Text-to-Video Converter!** 🎥 Take your content creation to the next level with our cutting-edge Text-to-Video Converter! Transform your words into stunning, professional-quality videos in just a few clicks. ✨ Features: • Convert text to engaging videos with customizable visuals • Choose from 40+ languages and 300+ voices • Perfect for creating audiobooks, storytelling, and language learning materials • Ideal for educators, content creators, and language enthusiasts 📝 **Long Text Support**: Texts longer than 15 minutes will be **automatically segmented** into smaller chunks for processing and then **merged back** into a single high-quality audio file. This ensures stability and allows for unlimited text length! """ default_voice = "" for voice_key in voices.keys(): if "XiaoxiaoNeural" in voice_key: default_voice = voice_key break with gr.Blocks(title="Edge TTS Text-to-Speech") as demo: gr.Markdown("# Edge TTS Text-to-Speech") gr.Markdown(description) with gr.Row(): with gr.Column(): text_input = gr.Textbox(label="Input Text", lines=8, placeholder="Enter your text here... Long texts will be automatically segmented if they exceed 15 minutes of speech time.") # File upload component file_upload = gr.File( label="Or Upload File (TXT/EPUB)", file_types=[".txt", ".epub"], type="filepath" ) # Add text analysis info text_info = gr.Markdown("**Text Analysis**: Enter text or upload a file to see estimated duration and segment count", visible=True) with gr.Accordion("Text Cleaning Settings", open=True): with gr.Row(): enable_cleaning = gr.Checkbox(label="Enable Text Cleaning", value=True) save_cleaned = gr.Checkbox(label="Save Cleaned Text File", value=True) with gr.Group(visible=True) as cleaning_options_group: with gr.Row(): clean_urls = gr.Checkbox(label="Remove URLs", value=True) clean_html = gr.Checkbox(label="Remove HTML", value=True) with gr.Row(): clean_markdown = gr.Checkbox(label="Remove Markdown", value=True) clean_ads = gr.Checkbox(label="Filter Ads", value=True) with gr.Row(): fix_enc = gr.Checkbox(label="Fix Encoding", value=True) tidy_ws = gr.Checkbox(label="Tidy Whitespace", value=True) with gr.Row(): del_gutenberg = gr.Checkbox(label="Remove Project Gutenberg", value=True) del_special = gr.Checkbox(label="Remove Special Characters", value=True) with gr.Row(): wetext_norm = gr.Checkbox(label="Enable WeText Normalization", value=True) def toggle_options(enabled): return gr.update(visible=enabled) enable_cleaning.change(fn=toggle_options, inputs=[enable_cleaning], outputs=[cleaning_options_group]) voice_dropdown = gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Voice", value=default_voice) with gr.Row(): rate_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate (%)", step=1) volume_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Volume (%)", step=1) pitch_slider = gr.Slider(minimum=-20, maximum=20, value=0, label="Pitch (Hz)", step=1) # Output format selection output_format = gr.Radio( choices=["mp3", "m4b"], value="mp3", label="Output Format", info="MP3 is default. M4B is audiobook format (requires ffmpeg)." ) generate_btn = gr.Button("Generate Audio", variant="primary") with gr.Column(): audio_output = gr.Audio(label="Generated Audio", type="filepath") # Progress and status display with gr.Group(): gr.Markdown("### 📊 Processing Progress") progress_info = gr.Markdown("Ready, click Generate to start...", visible=True) # Processing details with gr.Accordion("🔍 Processing Details", open=True) as processing_details: status_output = gr.Markdown("Waiting...", visible=True) # Segment information display with gr.Accordion("📋 Segment Information", open=True) as segment_info: segment_details = gr.Markdown("Segment details will appear here for long texts", visible=True) gr.Markdown("Experience the power of Edge TTS for text-to-speech conversion, and explore our advanced Text-to-Video Converter for even more creative possibilities!") # Add text analysis function def analyze_text(text, uploaded_file): # If file is uploaded, parse it first if uploaded_file is not None: file_text, filename = parse_uploaded_file(uploaded_file) if file_text: text = file_text else: return f"**Text Analysis**: Failed to parse uploaded file" if not text or not text.strip(): return "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count" duration = estimate_text_duration(text) word_count = len(text.split()) char_count = len(text) if duration > 15: segments = split_text_by_paragraphs(text) segment_count = len(segments) return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated" else: return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time" # Handle file upload - show preview in text box def on_file_upload(uploaded_file): if uploaded_file is None: return gr.update(), "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count" file_text, filename = parse_uploaded_file(uploaded_file) if file_text: # Calculate analysis duration = estimate_text_duration(file_text) word_count = len(file_text.split()) char_count = len(file_text) if duration > 15: segments = split_text_by_paragraphs(file_text) segment_count = len(segments) analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated" else: analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time" return gr.update(value=file_text), analysis else: return gr.update(), "**Text Analysis**: Failed to parse uploaded file" # Update text analysis when text changes text_input.change( fn=analyze_text, inputs=[text_input, file_upload], outputs=[text_info] ) # Update text box and analysis when file is uploaded file_upload.change( fn=on_file_upload, inputs=[file_upload], outputs=[text_input, text_info] ) generate_btn.click( fn=tts_interface, inputs=[ text_input, file_upload, voice_dropdown, rate_slider, volume_slider, pitch_slider, output_format, enable_cleaning, save_cleaned, clean_urls, clean_html, clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg, del_special, wetext_norm ], outputs=[audio_output, progress_info, status_output, segment_details] ) return demo async def main(): demo = await create_demo() demo.queue(default_concurrency_limit=5) demo.launch(show_api=False) if __name__ == "__main__": asyncio.run(main())