Spaces:
Runtime error
Runtime error
| #!/usr/bin/env python3 | |
| """ | |
| EPUB processing pipeline for Russian Audiobook Studio. | |
| Integrates EPUB chapter detection with ESpeechBackend for TTS processing. | |
| """ | |
| import os | |
| import time | |
| import gc | |
| from typing import List, Optional, Callable, Dict, Any, Tuple | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import numpy as np | |
| import soundfile as sf | |
| from epub_processor import EpubProcessor, Chapter | |
| from backends.espeech_backend import EspeechBackend | |
| from utils.text import normalize_text, split_into_paragraphs, maybe_ruaccent | |
| from utils.audio import crossfade_concat, normalize_lufs, save_wav | |
| class ProcessingSettings: | |
| """Settings for EPUB processing pipeline.""" | |
| speed: float = 1.0 | |
| nfe_steps: int = 48 | |
| crossfade_ms: int = 150 | |
| target_lufs: float = -20.0 | |
| seed: Optional[int] = None | |
| ref_audio_path: str = "samples/001/sample.mp3" | |
| ref_text: str = "" | |
| # Voice and tone parameters | |
| target_rms: float = 0.1 | |
| cfg_strength: float = 2.0 | |
| sway_sampling_coef: float = -1.0 | |
| def __post_init__(self): | |
| """Load default reference text if not provided.""" | |
| if not self.ref_text: | |
| try: | |
| with open("samples/001/sample.text", "r", encoding="utf-8") as f: | |
| self.ref_text = f.read().strip() | |
| except FileNotFoundError: | |
| self.ref_text = "" | |
| class ChapterProcessingResult: | |
| """Result of processing a single chapter.""" | |
| chapter_index: int | |
| chapter_title: str | |
| status: str # pending, processing, completed, error | |
| audio_data: Optional[np.ndarray] = None | |
| sample_rate: Optional[int] = None | |
| processing_time: float = 0.0 | |
| error_message: Optional[str] = None | |
| word_count: int = 0 | |
| estimated_duration: float = 0.0 | |
| class ProcessingStatistics: | |
| """Statistics for book processing.""" | |
| total_chapters: int = 0 | |
| completed_chapters: int = 0 | |
| failed_chapters: int = 0 | |
| total_processing_time: float = 0.0 | |
| average_processing_time: float = 0.0 | |
| total_audio_duration: float = 0.0 | |
| total_word_count: int = 0 | |
| class EpubProcessingPipeline: | |
| """Main pipeline for processing EPUB books into audiobooks.""" | |
| def __init__(self, epub_processor: EpubProcessor, tts_backend: EspeechBackend): | |
| """ | |
| Initialize the processing pipeline. | |
| Args: | |
| epub_processor: EPUB processor for chapter extraction | |
| tts_backend: TTS backend for audio synthesis | |
| """ | |
| self.epub_processor = epub_processor | |
| self.tts_backend = tts_backend | |
| self.processing_settings = ProcessingSettings() | |
| # Ensure backend is loaded | |
| self.tts_backend._ensure_loaded() | |
| def process_chapter( | |
| self, | |
| chapter: Chapter, | |
| settings: ProcessingSettings, | |
| progress_callback: Optional[Callable[[int, str, str], None]] = None, | |
| cancellation_check: Optional[Callable[[], bool]] = None | |
| ) -> ChapterProcessingResult: | |
| """ | |
| Process a single chapter into audio. | |
| Args: | |
| chapter: Chapter to process | |
| settings: Processing settings | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| ChapterProcessingResult with processing status and audio data | |
| """ | |
| start_time = time.time() | |
| if progress_callback: | |
| progress_callback(chapter.order, "processing", f"Processing {chapter.title}") | |
| # Check for cancellation before starting | |
| if cancellation_check and cancellation_check(): | |
| return ChapterProcessingResult( | |
| chapter_index=chapter.order, | |
| chapter_title=chapter.title, | |
| status="cancelled", | |
| processing_time=0.0, | |
| error_message="Processing cancelled by user", | |
| word_count=chapter.word_count, | |
| estimated_duration=chapter.estimated_duration | |
| ) | |
| try: | |
| # Normalize and prepare text | |
| normalized_text = normalize_text(chapter.content) | |
| paragraphs = split_into_paragraphs(normalized_text) | |
| paragraphs = [maybe_ruaccent(p) for p in paragraphs] | |
| if not paragraphs: | |
| raise ValueError("No text content to process") | |
| # Process each paragraph | |
| audio_pieces = [] | |
| sample_rate = None | |
| for i, paragraph in enumerate(paragraphs): | |
| if not paragraph.strip(): | |
| continue | |
| # Check for cancellation before each paragraph | |
| if cancellation_check and cancellation_check(): | |
| return ChapterProcessingResult( | |
| chapter_index=chapter.order, | |
| chapter_title=chapter.title, | |
| status="cancelled", | |
| processing_time=time.time() - start_time, | |
| error_message="Processing cancelled by user", | |
| word_count=chapter.word_count, | |
| estimated_duration=chapter.estimated_duration | |
| ) | |
| # Synthesize audio for paragraph with enhanced voice parameters | |
| audio, sr = self.tts_backend.synthesize( | |
| text=paragraph, | |
| ref_audio_path=settings.ref_audio_path, | |
| ref_text=settings.ref_text, | |
| speed=settings.speed, | |
| nfe_steps=settings.nfe_steps, | |
| seed=settings.seed, | |
| cross_fade_sec=settings.crossfade_ms / 1000.0, | |
| target_rms=settings.target_rms, | |
| cfg_strength=settings.cfg_strength, | |
| sway_sampling_coef=settings.sway_sampling_coef | |
| ) | |
| audio_pieces.append(audio) | |
| sample_rate = sr | |
| # Memory cleanup after each paragraph | |
| if i % 5 == 0: # Every 5 paragraphs | |
| gc.collect() | |
| if not audio_pieces: | |
| raise ValueError("No audio generated") | |
| # Concatenate audio pieces with crossfade | |
| final_audio = crossfade_concat( | |
| audio_pieces, | |
| crossfade_ms=settings.crossfade_ms, | |
| sample_rate=sample_rate | |
| ) | |
| # Normalize audio levels | |
| final_audio = normalize_lufs( | |
| final_audio, | |
| sample_rate, | |
| target_lufs=settings.target_lufs | |
| ) | |
| processing_time = time.time() - start_time | |
| if progress_callback: | |
| progress_callback(chapter.order, "completed", f"Completed {chapter.title}") | |
| return ChapterProcessingResult( | |
| chapter_index=chapter.order, | |
| chapter_title=chapter.title, | |
| status="completed", | |
| audio_data=final_audio, | |
| sample_rate=sample_rate, | |
| processing_time=processing_time, | |
| word_count=chapter.word_count, | |
| estimated_duration=chapter.estimated_duration | |
| ) | |
| except Exception as e: | |
| processing_time = time.time() - start_time | |
| error_msg = f"Error processing {chapter.title}: {str(e)}" | |
| if progress_callback: | |
| progress_callback(chapter.order, "error", error_msg) | |
| return ChapterProcessingResult( | |
| chapter_index=chapter.order, | |
| chapter_title=chapter.title, | |
| status="error", | |
| processing_time=processing_time, | |
| error_message=error_msg, | |
| word_count=chapter.word_count, | |
| estimated_duration=chapter.estimated_duration | |
| ) | |
| def process_chapter_with_retry( | |
| self, | |
| chapter: Chapter, | |
| settings: ProcessingSettings, | |
| max_retries: int = 2, | |
| progress_callback: Optional[Callable[[int, str, str], None]] = None, | |
| cancellation_check: Optional[Callable[[], bool]] = None | |
| ) -> ChapterProcessingResult: | |
| """ | |
| Process a chapter with retry mechanism for failed attempts. | |
| Args: | |
| chapter: Chapter to process | |
| settings: Processing settings | |
| max_retries: Maximum number of retry attempts | |
| progress_callback: Optional callback for progress updates | |
| Returns: | |
| ChapterProcessingResult with processing status | |
| """ | |
| last_result = None | |
| for attempt in range(max_retries + 1): | |
| if attempt > 0: | |
| if progress_callback: | |
| progress_callback(chapter.order, "processing", f"Retry {attempt} for {chapter.title}") | |
| time.sleep(1) # Brief pause before retry | |
| result = self.process_chapter(chapter, settings, progress_callback, cancellation_check) | |
| last_result = result | |
| if result.status == "completed": | |
| return result | |
| # All retries failed | |
| if progress_callback: | |
| progress_callback(chapter.order, "error", f"Failed after {max_retries} retries") | |
| return last_result | |
| def process_book( | |
| self, | |
| chapters: List[Chapter], | |
| settings: ProcessingSettings, | |
| progress_callback: Optional[Callable[[int, str, str], None]] = None, | |
| max_retries: int = 2, | |
| cancellation_check: Optional[Callable[[], bool]] = None | |
| ) -> List[ChapterProcessingResult]: | |
| """ | |
| Process an entire book with multiple chapters. | |
| Args: | |
| chapters: List of chapters to process | |
| settings: Processing settings | |
| progress_callback: Optional callback for progress updates | |
| max_retries: Maximum retries per chapter | |
| Returns: | |
| List of ChapterProcessingResult objects | |
| """ | |
| results = [] | |
| for chapter in chapters: | |
| # Check for cancellation before each chapter | |
| if cancellation_check and cancellation_check(): | |
| break | |
| # Update chapter status | |
| self.epub_processor.update_chapter_status(chapters, chapter.order, "processing") | |
| # Process chapter with retry | |
| result = self.process_chapter_with_retry( | |
| chapter, | |
| settings, | |
| max_retries, | |
| progress_callback, | |
| cancellation_check | |
| ) | |
| # Update chapter status based on result | |
| if result.status == "completed": | |
| self.epub_processor.update_chapter_status(chapters, chapter.order, "completed") | |
| else: | |
| self.epub_processor.update_chapter_status( | |
| chapters, | |
| chapter.order, | |
| "error", | |
| result.error_message | |
| ) | |
| results.append(result) | |
| # Memory cleanup after each chapter | |
| gc.collect() | |
| return results | |
| def concatenate_chapter_audio(self, results: List[ChapterProcessingResult]) -> Optional[np.ndarray]: | |
| """ | |
| Concatenate audio from multiple chapter results. | |
| Args: | |
| results: List of ChapterProcessingResult objects | |
| Returns: | |
| Concatenated audio array or None if no valid audio | |
| """ | |
| valid_audio = [] | |
| sample_rate = None | |
| for result in results: | |
| if result.status == "completed" and result.audio_data is not None: | |
| valid_audio.append(result.audio_data) | |
| if sample_rate is None: | |
| sample_rate = result.sample_rate | |
| if not valid_audio: | |
| return None | |
| # Concatenate with crossfade | |
| return crossfade_concat(valid_audio, crossfade_ms=150, sample_rate=sample_rate) | |
| def export_audiobook( | |
| self, | |
| results: List[ChapterProcessingResult], | |
| output_path: str, | |
| export_individual_chapters: bool = True | |
| ) -> Dict[str, str]: | |
| """ | |
| Export processed audiobook to files. | |
| Args: | |
| results: List of ChapterProcessingResult objects | |
| output_path: Base path for output files | |
| export_individual_chapters: Whether to export individual chapter files | |
| Returns: | |
| Dictionary with paths to exported files | |
| """ | |
| output_dir = Path(output_path).parent | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| exported_files = {} | |
| sample_rate = None | |
| # Export individual chapters | |
| if export_individual_chapters: | |
| for result in results: | |
| if result.status == "completed" and result.audio_data is not None: | |
| chapter_filename = f"chapter_{result.chapter_index:03d}_{result.chapter_title.replace(' ', '_')}.wav" | |
| chapter_path = output_dir / chapter_filename | |
| save_wav(str(chapter_path), result.audio_data, result.sample_rate) | |
| exported_files[f"chapter_{result.chapter_index}"] = str(chapter_path) | |
| if sample_rate is None: | |
| sample_rate = result.sample_rate | |
| # Export complete audiobook | |
| concatenated_audio = self.concatenate_chapter_audio(results) | |
| if concatenated_audio is not None: | |
| # Use the exact output path specified by the user | |
| complete_path = Path(output_path) | |
| save_wav(str(complete_path), concatenated_audio, sample_rate) | |
| exported_files["complete"] = str(complete_path) | |
| return exported_files | |
| def get_processing_statistics(self, results: List[ChapterProcessingResult]) -> ProcessingStatistics: | |
| """ | |
| Get processing statistics from results. | |
| Args: | |
| results: List of ChapterProcessingResult objects | |
| Returns: | |
| ProcessingStatistics object | |
| """ | |
| total_chapters = len(results) | |
| completed_chapters = sum(1 for r in results if r.status == "completed") | |
| failed_chapters = sum(1 for r in results if r.status == "error") | |
| total_processing_time = sum(r.processing_time for r in results) | |
| total_word_count = sum(r.word_count for r in results) | |
| total_audio_duration = sum(r.estimated_duration for r in results) | |
| average_processing_time = total_processing_time / total_chapters if total_chapters > 0 else 0.0 | |
| return ProcessingStatistics( | |
| total_chapters=total_chapters, | |
| completed_chapters=completed_chapters, | |
| failed_chapters=failed_chapters, | |
| total_processing_time=total_processing_time, | |
| average_processing_time=average_processing_time, | |
| total_audio_duration=total_audio_duration, | |
| total_word_count=total_word_count | |
| ) | |
| def cleanup(self): | |
| """Clean up resources.""" | |
| self.epub_processor.cleanup_temp_files() | |
| gc.collect() | |