Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import edge_tts | |
| import asyncio | |
| import tempfile | |
| import os | |
| import re | |
| import shutil | |
| from pydub import AudioSegment | |
| import math | |
| import time | |
| from datetime import datetime, timedelta | |
| import logging | |
| from text_cleaning import TextCleaner | |
| # EPUB parsing | |
| try: | |
| import ebooklib | |
| from ebooklib import epub | |
| from bs4 import BeautifulSoup | |
| EPUB_SUPPORT = True | |
| except ImportError: | |
| EPUB_SUPPORT = False | |
| logging.warning("ebooklib or beautifulsoup4 not installed. EPUB support disabled.") | |
| # Encoding detection | |
| try: | |
| import chardet | |
| CHARDET_SUPPORT = True | |
| except ImportError: | |
| CHARDET_SUPPORT = False | |
| logging.warning("chardet not installed. Encoding detection will use fallback method.") | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| def detect_file_encoding(file_path): | |
| """Detect file encoding using chardet or fallback method""" | |
| if CHARDET_SUPPORT: | |
| with open(file_path, 'rb') as f: | |
| raw_data = f.read() | |
| result = chardet.detect(raw_data) | |
| encoding = result['encoding'] | |
| confidence = result['confidence'] | |
| logger.info(f"Detected encoding: {encoding} (confidence: {confidence:.2%})") | |
| # Handle common encoding aliases | |
| if encoding: | |
| encoding_lower = encoding.lower() | |
| # Map common aliases to standard names | |
| encoding_map = { | |
| 'gb2312': 'gbk', # GBK is superset of GB2312 | |
| 'gb18030': 'gb18030', | |
| 'ascii': 'utf-8', # ASCII is subset of UTF-8 | |
| 'iso-8859-1': 'latin-1', | |
| 'windows-1252': 'cp1252', | |
| } | |
| encoding = encoding_map.get(encoding_lower, encoding) | |
| return encoding | |
| else: | |
| # Fallback: try common encodings | |
| return None | |
| def read_text_file_with_encoding(file_path): | |
| """Read text file with automatic encoding detection""" | |
| # First try chardet detection | |
| detected_encoding = detect_file_encoding(file_path) | |
| # Priority list of encodings to try | |
| # Common encodings for Chinese: UTF-8, GBK, GB2312, GB18030 | |
| # Common encodings for English/Western: UTF-8, Latin-1, CP1252 | |
| encodings_to_try = [] | |
| if detected_encoding: | |
| encodings_to_try.append(detected_encoding) | |
| # Add common encodings as fallback | |
| encodings_to_try.extend([ | |
| 'utf-8', | |
| 'utf-8-sig', # UTF-8 with BOM | |
| 'gbk', # Chinese (simplified) | |
| 'gb18030', # Chinese (extended) | |
| 'big5', # Chinese (traditional) | |
| 'utf-16', | |
| 'latin-1', # Western European | |
| 'cp1252', # Windows Western | |
| 'shift_jis', # Japanese | |
| 'euc-kr', # Korean | |
| ]) | |
| # Remove duplicates while preserving order | |
| seen = set() | |
| unique_encodings = [] | |
| for enc in encodings_to_try: | |
| if enc and enc.lower() not in seen: | |
| seen.add(enc.lower()) | |
| unique_encodings.append(enc) | |
| last_error = None | |
| for encoding in unique_encodings: | |
| try: | |
| with open(file_path, 'r', encoding=encoding) as f: | |
| text = f.read() | |
| # Validate: check if text contains too many replacement characters | |
| if text.count('\ufffd') > len(text) * 0.1: # More than 10% replacement chars | |
| logger.debug(f"Encoding {encoding} produced too many replacement characters, trying next...") | |
| continue | |
| logger.info(f"Successfully read file with encoding: {encoding}") | |
| return text | |
| except (UnicodeDecodeError, LookupError) as e: | |
| last_error = e | |
| logger.debug(f"Failed to decode with {encoding}: {e}") | |
| continue | |
| logger.error(f"Failed to decode file with any encoding. Last error: {last_error}") | |
| return None | |
| def parse_uploaded_file(file_path): | |
| """Parse uploaded txt or epub file and return text content and filename""" | |
| if file_path is None: | |
| return None, None | |
| filename = os.path.splitext(os.path.basename(file_path))[0] | |
| ext = os.path.splitext(file_path)[1].lower() | |
| if ext == '.txt': | |
| text = read_text_file_with_encoding(file_path) | |
| if text: | |
| logger.info(f"Parsed TXT file: {filename}, {len(text)} chars") | |
| return text, filename | |
| else: | |
| logger.error(f"Failed to decode TXT file: {filename}") | |
| return None, filename | |
| elif ext == '.epub': | |
| if not EPUB_SUPPORT: | |
| logger.error("EPUB support not available") | |
| return None, filename | |
| try: | |
| book = epub.read_epub(file_path) | |
| text_parts = [] | |
| for item in book.get_items(): | |
| if item.get_type() == ebooklib.ITEM_DOCUMENT: | |
| soup = BeautifulSoup(item.get_content(), 'html.parser') | |
| text_parts.append(soup.get_text(separator='\n')) | |
| text = '\n\n'.join(text_parts) | |
| logger.info(f"Parsed EPUB file: {filename}, {len(text)} chars") | |
| return text, filename | |
| except Exception as e: | |
| logger.error(f"Failed to parse EPUB: {e}") | |
| return None, filename | |
| return None, None | |
| async def convert_to_m4b(mp3_path, output_filename): | |
| """Convert MP3 to M4B format using ffmpeg directly (supports large files)""" | |
| try: | |
| import subprocess | |
| m4b_path = tempfile.NamedTemporaryFile(delete=False, suffix=".m4b").name | |
| # Use ffmpeg directly for conversion (avoids pydub's 4GB limit) | |
| cmd = [ | |
| 'ffmpeg', '-y', # Overwrite output | |
| '-i', mp3_path, # Input file | |
| '-c:a', 'aac', # Audio codec | |
| '-b:a', '128k', # Audio bitrate | |
| '-f', 'ipod', # M4B/M4A format | |
| m4b_path | |
| ] | |
| logger.info(f"Running ffmpeg conversion: {' '.join(cmd)}") | |
| result = subprocess.run( | |
| cmd, | |
| capture_output=True, | |
| text=True, | |
| timeout=3600 # 1 hour timeout for large files | |
| ) | |
| if result.returncode != 0: | |
| logger.error(f"ffmpeg error: {result.stderr}") | |
| if os.path.exists(m4b_path): | |
| os.remove(m4b_path) | |
| return None | |
| logger.info(f"Converted to M4B: {m4b_path}") | |
| return m4b_path | |
| except FileNotFoundError: | |
| logger.error("ffmpeg not found. Please install ffmpeg to use M4B format.") | |
| return None | |
| except subprocess.TimeoutExpired: | |
| logger.error("ffmpeg conversion timed out") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Failed to convert to M4B: {e}") | |
| return None | |
| async def get_voices(): | |
| voices = await edge_tts.list_voices() | |
| return {f"{v['ShortName']} - {v['Locale']} ({v['Gender']})": v['ShortName'] for v in voices} | |
| def format_time_remaining(seconds): | |
| """Format seconds into human readable time remaining""" | |
| if seconds < 60: | |
| return f"{int(seconds)}s" | |
| elif seconds < 3600: | |
| minutes = seconds / 60 | |
| return f"{minutes:.1f}m" | |
| else: | |
| hours = seconds / 3600 | |
| return f"{hours:.1f}h" | |
| def calculate_eta(start_time, completed_items, total_items): | |
| """Calculate estimated time remaining""" | |
| if completed_items == 0: | |
| return "Calculating..." | |
| elapsed_time = time.time() - start_time | |
| time_per_item = elapsed_time / completed_items | |
| remaining_items = total_items - completed_items | |
| remaining_time = time_per_item * remaining_items | |
| return format_time_remaining(remaining_time) | |
| def estimate_text_duration(text): | |
| """Estimate speech duration in minutes based on text length""" | |
| # Simple heuristic: | |
| # For English (space-separated), ~150 words/min | |
| # For Chinese (no spaces), ~300 chars/min | |
| # We'll use a hybrid approach: count spaces to guess if it's space-separated. | |
| if not text: | |
| return 0 | |
| space_count = text.count(' ') | |
| total_len = len(text) | |
| # If spaces are < 10% of length, assume non-space-separated (like Chinese) | |
| if space_count / total_len < 0.1: | |
| # Approx 300 chars per minute for Chinese | |
| duration = total_len / 300 | |
| # logger.debug(f"Estimated duration (char-based): {duration:.2f} min ({total_len} chars)") | |
| else: | |
| # Approx 150 words per minute for English | |
| word_count = len(text.split()) | |
| duration = word_count / 150 | |
| # logger.debug(f"Estimated duration (word-based): {duration:.2f} min ({word_count} words)") | |
| return duration | |
| def split_text_by_paragraphs(text, max_duration_minutes=5, max_chars=500): | |
| """Split text into segments that won't exceed limit with safety margin""" | |
| max_duration = max_duration_minutes | |
| estimated_duration = estimate_text_duration(text) | |
| logger.info(f"Checking segmentation: Duration={estimated_duration:.2f}m, Chars={len(text)}, Limit={max_duration}m/{max_chars}chars") | |
| if estimated_duration <= max_duration and len(text) <= max_chars: | |
| return [text] | |
| logger.info(f"Text exceeds limits. Splitting...") | |
| # Split by paragraphs first | |
| paragraphs = text.split('\n\n') | |
| segments = [] | |
| current_segment = "" | |
| for paragraph in paragraphs: | |
| paragraph_duration = estimate_text_duration(paragraph) | |
| # If single paragraph is too long, split by sentences | |
| # Improved regex to include Chinese punctuation | |
| if paragraph_duration > max_duration or len(paragraph) > max_chars: | |
| sentences = re.split(r'([.!?。!?]+)', paragraph) | |
| # Re-attach delimiters to sentences | |
| real_sentences = [] | |
| for i in range(0, len(sentences) - 1, 2): | |
| real_sentences.append(sentences[i] + sentences[i+1]) | |
| if len(sentences) % 2 == 1 and sentences[-1]: | |
| real_sentences.append(sentences[-1]) | |
| for sentence in real_sentences: | |
| sentence = sentence.strip() | |
| if not sentence: | |
| continue | |
| # Check both duration and char count | |
| if (estimate_text_duration(current_segment + sentence) > max_duration or | |
| len(current_segment + sentence) > max_chars) and current_segment: | |
| segments.append(current_segment.strip()) | |
| current_segment = sentence | |
| else: | |
| current_segment += sentence | |
| else: | |
| if (estimate_text_duration(current_segment + paragraph) > max_duration or | |
| len(current_segment + paragraph) > max_chars) and current_segment: | |
| segments.append(current_segment.strip()) | |
| current_segment = paragraph + "\n\n" | |
| else: | |
| current_segment += paragraph + "\n\n" | |
| if current_segment.strip(): | |
| segments.append(current_segment.strip()) | |
| logger.info(f"Split text into {len(segments)} segments.") | |
| return segments | |
| import io | |
| async def generate_audio_segment(text_segment, voice_short_name, rate_str, volume_str, pitch_str, segment_index): | |
| """Generate audio for a single text segment and save to temporary file""" | |
| logger.info(f"Generating segment {segment_index}...") | |
| communicate = edge_tts.Communicate(text_segment, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str) | |
| # Save directly to temporary file instead of memory | |
| tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=f"_seg{segment_index}.mp3") | |
| tmp_path = tmp_file.name | |
| tmp_file.close() | |
| try: | |
| await communicate.save(tmp_path) | |
| except Exception as e: | |
| logger.error(f"Error generating segment {segment_index} (Length: {len(text_segment)} chars): {e}") | |
| if os.path.exists(tmp_path): | |
| os.remove(tmp_path) | |
| raise gr.Error(f"Error generating segment {segment_index}: {e}") | |
| # Verify segment duration | |
| try: | |
| seg_audio = AudioSegment.from_mp3(tmp_path) | |
| duration_min = len(seg_audio) / 1000 / 60 | |
| logger.info(f"Segment {segment_index} saved to temp file (Duration: {duration_min:.2f} min)") | |
| except Exception as e: | |
| logger.error(f"Error checking segment {segment_index} duration: {e}") | |
| return tmp_path | |
| async def merge_audio_files(audio_paths): | |
| """Merge multiple audio files into one file using binary concatenation""" | |
| if not audio_paths: | |
| return None | |
| logger.info(f"Merging {len(audio_paths)} audio segments...") | |
| # Create output file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| merged_path = tmp_file.name | |
| # Binary concatenation of MP3 files (avoids WAV size limit) | |
| total_size = 0 | |
| with open(merged_path, 'wb') as outfile: | |
| for i, audio_path in enumerate(audio_paths): | |
| try: | |
| with open(audio_path, 'rb') as infile: | |
| data = infile.read() | |
| outfile.write(data) | |
| total_size += len(data) | |
| # Delete temporary segment file after merging | |
| os.remove(audio_path) | |
| logger.info(f"Merged and deleted segment {i+1}") | |
| except Exception as e: | |
| logger.error(f"Error merging segment {i+1}: {e}") | |
| logger.info(f"Merged audio saved to {merged_path} (Total size: {total_size / 1024 / 1024:.2f} MB)") | |
| return merged_path | |
| async def text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options=None, output_format="mp3", output_filename=None): | |
| """Generate speech with detailed progress tracking via generator""" | |
| if not text.strip(): | |
| yield None, "Please enter text to convert.", None | |
| return | |
| if not voice: | |
| yield None, "Please select a voice.", None | |
| return | |
| # Apply text cleaning if enabled | |
| if cleaning_options and cleaning_options.get('enable_cleaning', False): | |
| yield 0, "Cleaning text...", None | |
| # original_text = text # Unused | |
| text = TextCleaner.clean_text(text, cleaning_options) | |
| if cleaning_options.get('save_cleaned', False): | |
| # Create a filename based on timestamp or first few words | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"text_{timestamp}.txt" | |
| saved_path = TextCleaner.save_cleaned_text(text, filename) | |
| if saved_path: | |
| logger.info(f"Saved cleaned text to {saved_path}") | |
| if not text.strip(): | |
| yield None, "Text cleaning resulted in empty text.", None | |
| return | |
| voice_short_name = voice.split(" - ")[0] | |
| rate_str = f"{rate:+d}%" | |
| volume_str = f"{volume:+d}%" | |
| pitch_str = f"{pitch:+d}Hz" | |
| # Check if text is too long and needs segmentation | |
| estimated_duration = estimate_text_duration(text) | |
| yield 0, "Starting text processing...", None | |
| logger.info(f"Starting TTS for text with estimated duration: {estimated_duration:.2f}m") | |
| # Generate output filename with timestamp | |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
| if output_filename: | |
| final_filename = f"{output_filename}_{timestamp}" | |
| else: | |
| final_filename = f"audio_{timestamp}" | |
| final_audio_path = None | |
| if estimated_duration > 15: # If longer than 15 minutes, split into segments | |
| segments = split_text_by_paragraphs(text) | |
| total_segments = len(segments) | |
| segment_info = f"Text split into {total_segments} segments. Total estimated duration: {estimated_duration:.1f} min" | |
| yield 5, segment_info, segment_info | |
| if total_segments > 1: | |
| # Generate audio for each segment with progress tracking | |
| audio_objects = [] | |
| start_time = time.time() | |
| for i, segment in enumerate(segments): | |
| if segment.strip(): | |
| segment_duration = estimate_text_duration(segment) | |
| progress = 10 + (80 * i / total_segments) # 10% to 90% | |
| eta = calculate_eta(start_time, i, total_segments) | |
| status_msg = ( | |
| f"Generating segment {i+1}/{total_segments}...\n" | |
| f"Segment duration: {segment_duration:.1f} min\n" | |
| f"ETA: {eta}" | |
| ) | |
| logger.info(f"Progress: {status_msg.replace(chr(10), ', ')}") | |
| yield progress, status_msg, segment_info | |
| # Generate to memory | |
| audio_obj = await generate_audio_segment( | |
| segment, voice_short_name, rate_str, volume_str, pitch_str, i+1 | |
| ) | |
| audio_objects.append(audio_obj) | |
| yield 90, "Merging audio files...", segment_info | |
| # Merge all audio objects | |
| merged_audio_path = await merge_audio_files(audio_objects) | |
| final_audio_path = merged_audio_path | |
| # Convert to M4B if requested | |
| if output_format == "m4b" and merged_audio_path: | |
| yield 95, "Converting to M4B format...", segment_info | |
| m4b_path = await convert_to_m4b(merged_audio_path, final_filename) | |
| if m4b_path: | |
| os.remove(merged_audio_path) | |
| final_audio_path = m4b_path | |
| # Rename to final filename | |
| if final_audio_path: | |
| ext = ".m4b" if output_format == "m4b" else ".mp3" | |
| new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}") | |
| shutil.move(final_audio_path, new_path) | |
| final_audio_path = new_path | |
| yield 100, "Audio generation complete! ✅", segment_info | |
| yield final_audio_path, "Done", segment_info | |
| return | |
| # For short texts or single segment, use original method | |
| yield 50, "Generating audio...", None | |
| logger.info("Generating single segment audio...") | |
| communicate = edge_tts.Communicate(text, voice_short_name, rate=rate_str, volume=volume_str, pitch=pitch_str) | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".mp3") as tmp_file: | |
| tmp_path = tmp_file.name | |
| await communicate.save(tmp_path) | |
| final_audio_path = tmp_path | |
| # Convert to M4B if requested | |
| if output_format == "m4b": | |
| yield 80, "Converting to M4B format...", None | |
| m4b_path = await convert_to_m4b(tmp_path, final_filename) | |
| if m4b_path: | |
| os.remove(tmp_path) | |
| final_audio_path = m4b_path | |
| # Rename to final filename | |
| if final_audio_path: | |
| ext = ".m4b" if output_format == "m4b" else ".mp3" | |
| new_path = os.path.join(os.path.dirname(final_audio_path), f"{final_filename}{ext}") | |
| shutil.move(final_audio_path, new_path) | |
| final_audio_path = new_path | |
| logger.info(f"Audio generated at {final_audio_path}") | |
| yield 100, "Audio generation complete! ✅", None | |
| yield final_audio_path, "Done", None | |
| async def tts_interface(text, uploaded_file, voice, rate, volume, pitch, output_format, | |
| enable_cleaning, save_cleaned, clean_urls, clean_html, | |
| clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg, | |
| del_special, wetext_norm): | |
| """Enhanced TTS interface with detailed progress tracking""" | |
| # Get output filename from uploaded file (if any) | |
| output_filename = None | |
| if uploaded_file is not None: | |
| output_filename = os.path.splitext(os.path.basename(uploaded_file))[0] | |
| logger.info(f"Using filename from uploaded file: {output_filename}") | |
| if not text.strip(): | |
| yield None, gr.update(visible=True, value="Please enter text or upload a file."), "No text provided", gr.update(visible=False) | |
| return | |
| if not voice: | |
| yield None, gr.update(visible=False), "Please select a voice.", gr.update(visible=False) | |
| return | |
| # Prepare cleaning options | |
| cleaning_options = { | |
| 'enable_cleaning': enable_cleaning, | |
| 'save_cleaned': save_cleaned, | |
| 'remove_urls': clean_urls, | |
| 'remove_html': clean_html, | |
| 'remove_markdown': clean_markdown, | |
| 'filter_ads': clean_ads, | |
| 'fix_encoding': fix_enc, | |
| 'tidy_whitespace': tidy_ws, | |
| 'remove_gutenberg': del_gutenberg, | |
| 'remove_special_chars': del_special, | |
| 'wetext_normalization': wetext_norm | |
| } | |
| # We need to clean text here first to estimate duration correctly? | |
| # Or let the generator handle it. The generator handles it, but estimation might be off. | |
| # Ideally we clean first if enabled, then estimate. | |
| working_text = text | |
| if enable_cleaning: | |
| working_text = TextCleaner.clean_text(text, cleaning_options) | |
| if save_cleaned: | |
| # We'll let the generator save it to avoid double saving or complex logic here, | |
| # but we need to pass the options. | |
| pass | |
| estimated_duration = estimate_text_duration(working_text) | |
| # Reset UI | |
| yield None, gr.update(value="Starting...", visible=True), "Initializing...", gr.update(visible=False) | |
| async for result in text_to_speech_generator(text, voice, rate, volume, pitch, cleaning_options, output_format, output_filename): | |
| if isinstance(result, tuple) and len(result) == 3: | |
| # Progress update | |
| progress_val, status_msg, segment_info = result | |
| if isinstance(progress_val, (int, float)): | |
| # It's a progress update | |
| segment_update = gr.update(value=segment_info, visible=True) if segment_info else gr.update(visible=False) | |
| yield None, gr.update(value=status_msg, visible=True), status_msg, segment_update | |
| else: | |
| # It's the final result (path, msg, info) | |
| audio_path = progress_val | |
| yield audio_path, gr.update(value="Complete!", visible=True), "Generation Complete", gr.update(visible=True) | |
| async def create_demo(): | |
| voices = await get_voices() | |
| description = """ | |
| Convert text to speech using Microsoft Edge TTS. Adjust speech rate and pitch: 0 is default, positive values increase, negative values decrease. | |
| 🎥 **Exciting News: Introducing our Text-to-Video Converter!** 🎥 | |
| Take your content creation to the next level with our cutting-edge Text-to-Video Converter! | |
| Transform your words into stunning, professional-quality videos in just a few clicks. | |
| ✨ Features: | |
| • Convert text to engaging videos with customizable visuals | |
| • Choose from 40+ languages and 300+ voices | |
| • Perfect for creating audiobooks, storytelling, and language learning materials | |
| • Ideal for educators, content creators, and language enthusiasts | |
| 📝 **Long Text Support**: | |
| Texts longer than 15 minutes will be **automatically segmented** into smaller chunks for processing and then **merged back** into a single high-quality audio file. This ensures stability and allows for unlimited text length! | |
| """ | |
| default_voice = "" | |
| for voice_key in voices.keys(): | |
| if "XiaoxiaoNeural" in voice_key: | |
| default_voice = voice_key | |
| break | |
| with gr.Blocks(title="Edge TTS Text-to-Speech") as demo: | |
| gr.Markdown("# Edge TTS Text-to-Speech") | |
| gr.Markdown(description) | |
| with gr.Row(): | |
| with gr.Column(): | |
| text_input = gr.Textbox(label="Input Text", lines=8, placeholder="Enter your text here... Long texts will be automatically segmented if they exceed 15 minutes of speech time.") | |
| # File upload component | |
| file_upload = gr.File( | |
| label="Or Upload File (TXT/EPUB)", | |
| file_types=[".txt", ".epub"], | |
| type="filepath" | |
| ) | |
| # Add text analysis info | |
| text_info = gr.Markdown("**Text Analysis**: Enter text or upload a file to see estimated duration and segment count", visible=True) | |
| with gr.Accordion("Text Cleaning Settings", open=True): | |
| with gr.Row(): | |
| enable_cleaning = gr.Checkbox(label="Enable Text Cleaning", value=True) | |
| save_cleaned = gr.Checkbox(label="Save Cleaned Text File", value=True) | |
| with gr.Group(visible=True) as cleaning_options_group: | |
| with gr.Row(): | |
| clean_urls = gr.Checkbox(label="Remove URLs", value=True) | |
| clean_html = gr.Checkbox(label="Remove HTML", value=True) | |
| with gr.Row(): | |
| clean_markdown = gr.Checkbox(label="Remove Markdown", value=True) | |
| clean_ads = gr.Checkbox(label="Filter Ads", value=True) | |
| with gr.Row(): | |
| fix_enc = gr.Checkbox(label="Fix Encoding", value=True) | |
| tidy_ws = gr.Checkbox(label="Tidy Whitespace", value=True) | |
| with gr.Row(): | |
| del_gutenberg = gr.Checkbox(label="Remove Project Gutenberg", value=True) | |
| del_special = gr.Checkbox(label="Remove Special Characters", value=True) | |
| with gr.Row(): | |
| wetext_norm = gr.Checkbox(label="Enable WeText Normalization", value=True) | |
| def toggle_options(enabled): | |
| return gr.update(visible=enabled) | |
| enable_cleaning.change(fn=toggle_options, inputs=[enable_cleaning], outputs=[cleaning_options_group]) | |
| voice_dropdown = gr.Dropdown(choices=[""] + list(voices.keys()), label="Select Voice", value=default_voice) | |
| with gr.Row(): | |
| rate_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Rate (%)", step=1) | |
| volume_slider = gr.Slider(minimum=-50, maximum=50, value=0, label="Speech Volume (%)", step=1) | |
| pitch_slider = gr.Slider(minimum=-20, maximum=20, value=0, label="Pitch (Hz)", step=1) | |
| # Output format selection | |
| output_format = gr.Radio( | |
| choices=["mp3", "m4b"], | |
| value="mp3", | |
| label="Output Format", | |
| info="MP3 is default. M4B is audiobook format (requires ffmpeg)." | |
| ) | |
| generate_btn = gr.Button("Generate Audio", variant="primary") | |
| with gr.Column(): | |
| audio_output = gr.Audio(label="Generated Audio", type="filepath") | |
| # Progress and status display | |
| with gr.Group(): | |
| gr.Markdown("### 📊 Processing Progress") | |
| progress_info = gr.Markdown("Ready, click Generate to start...", visible=True) | |
| # Processing details | |
| with gr.Accordion("🔍 Processing Details", open=True) as processing_details: | |
| status_output = gr.Markdown("Waiting...", visible=True) | |
| # Segment information display | |
| with gr.Accordion("📋 Segment Information", open=True) as segment_info: | |
| segment_details = gr.Markdown("Segment details will appear here for long texts", visible=True) | |
| gr.Markdown("Experience the power of Edge TTS for text-to-speech conversion, and explore our advanced Text-to-Video Converter for even more creative possibilities!") | |
| # Add text analysis function | |
| def analyze_text(text, uploaded_file): | |
| # If file is uploaded, parse it first | |
| if uploaded_file is not None: | |
| file_text, filename = parse_uploaded_file(uploaded_file) | |
| if file_text: | |
| text = file_text | |
| else: | |
| return f"**Text Analysis**: Failed to parse uploaded file" | |
| if not text or not text.strip(): | |
| return "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count" | |
| duration = estimate_text_duration(text) | |
| word_count = len(text.split()) | |
| char_count = len(text) | |
| if duration > 15: | |
| segments = split_text_by_paragraphs(text) | |
| segment_count = len(segments) | |
| return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated" | |
| else: | |
| return f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time" | |
| # Handle file upload - show preview in text box | |
| def on_file_upload(uploaded_file): | |
| if uploaded_file is None: | |
| return gr.update(), "**Text Analysis**: Enter text or upload a file to see estimated duration and segment count" | |
| file_text, filename = parse_uploaded_file(uploaded_file) | |
| if file_text: | |
| # Calculate analysis | |
| duration = estimate_text_duration(file_text) | |
| word_count = len(file_text.split()) | |
| char_count = len(file_text) | |
| if duration > 15: | |
| segments = split_text_by_paragraphs(file_text) | |
| segment_count = len(segments) | |
| analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time, {segment_count} segments will be generated" | |
| else: | |
| analysis = f"**Text Analysis**: {word_count} words, {char_count} characters, ~{duration:.1f} minutes speech time" | |
| return gr.update(value=file_text), analysis | |
| else: | |
| return gr.update(), "**Text Analysis**: Failed to parse uploaded file" | |
| # Update text analysis when text changes | |
| text_input.change( | |
| fn=analyze_text, | |
| inputs=[text_input, file_upload], | |
| outputs=[text_info] | |
| ) | |
| # Update text box and analysis when file is uploaded | |
| file_upload.change( | |
| fn=on_file_upload, | |
| inputs=[file_upload], | |
| outputs=[text_input, text_info] | |
| ) | |
| generate_btn.click( | |
| fn=tts_interface, | |
| inputs=[ | |
| text_input, file_upload, voice_dropdown, rate_slider, volume_slider, pitch_slider, | |
| output_format, enable_cleaning, save_cleaned, clean_urls, clean_html, | |
| clean_markdown, clean_ads, fix_enc, tidy_ws, del_gutenberg, | |
| del_special, wetext_norm | |
| ], | |
| outputs=[audio_output, progress_info, status_output, segment_details] | |
| ) | |
| return demo | |
| async def main(): | |
| demo = await create_demo() | |
| demo.queue(default_concurrency_limit=5) | |
| demo.launch(show_api=False) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |