audiobook-ru-tts / epub_processing_pipeline.py
danilahs's picture
Upload folder using huggingface_hub
4f6648e verified
#!/usr/bin/env python3
"""
EPUB processing pipeline for Russian Audiobook Studio.
Integrates EPUB chapter detection with ESpeechBackend for TTS processing.
"""
import os
import time
import gc
from typing import List, Optional, Callable, Dict, Any, Tuple
from dataclasses import dataclass
from pathlib import Path
import numpy as np
import soundfile as sf
from epub_processor import EpubProcessor, Chapter
from backends.espeech_backend import EspeechBackend
from utils.text import normalize_text, split_into_paragraphs, maybe_ruaccent
from utils.audio import crossfade_concat, normalize_lufs, save_wav
@dataclass
class ProcessingSettings:
"""Settings for EPUB processing pipeline."""
speed: float = 1.0
nfe_steps: int = 48
crossfade_ms: int = 150
target_lufs: float = -20.0
seed: Optional[int] = None
ref_audio_path: str = "samples/001/sample.mp3"
ref_text: str = ""
# Voice and tone parameters
target_rms: float = 0.1
cfg_strength: float = 2.0
sway_sampling_coef: float = -1.0
def __post_init__(self):
"""Load default reference text if not provided."""
if not self.ref_text:
try:
with open("samples/001/sample.text", "r", encoding="utf-8") as f:
self.ref_text = f.read().strip()
except FileNotFoundError:
self.ref_text = ""
@dataclass
class ChapterProcessingResult:
"""Result of processing a single chapter."""
chapter_index: int
chapter_title: str
status: str # pending, processing, completed, error
audio_data: Optional[np.ndarray] = None
sample_rate: Optional[int] = None
processing_time: float = 0.0
error_message: Optional[str] = None
word_count: int = 0
estimated_duration: float = 0.0
@dataclass
class ProcessingStatistics:
"""Statistics for book processing."""
total_chapters: int = 0
completed_chapters: int = 0
failed_chapters: int = 0
total_processing_time: float = 0.0
average_processing_time: float = 0.0
total_audio_duration: float = 0.0
total_word_count: int = 0
class EpubProcessingPipeline:
"""Main pipeline for processing EPUB books into audiobooks."""
def __init__(self, epub_processor: EpubProcessor, tts_backend: EspeechBackend):
"""
Initialize the processing pipeline.
Args:
epub_processor: EPUB processor for chapter extraction
tts_backend: TTS backend for audio synthesis
"""
self.epub_processor = epub_processor
self.tts_backend = tts_backend
self.processing_settings = ProcessingSettings()
# Ensure backend is loaded
self.tts_backend._ensure_loaded()
def process_chapter(
self,
chapter: Chapter,
settings: ProcessingSettings,
progress_callback: Optional[Callable[[int, str, str], None]] = None,
cancellation_check: Optional[Callable[[], bool]] = None
) -> ChapterProcessingResult:
"""
Process a single chapter into audio.
Args:
chapter: Chapter to process
settings: Processing settings
progress_callback: Optional callback for progress updates
Returns:
ChapterProcessingResult with processing status and audio data
"""
start_time = time.time()
if progress_callback:
progress_callback(chapter.order, "processing", f"Processing {chapter.title}")
# Check for cancellation before starting
if cancellation_check and cancellation_check():
return ChapterProcessingResult(
chapter_index=chapter.order,
chapter_title=chapter.title,
status="cancelled",
processing_time=0.0,
error_message="Processing cancelled by user",
word_count=chapter.word_count,
estimated_duration=chapter.estimated_duration
)
try:
# Normalize and prepare text
normalized_text = normalize_text(chapter.content)
paragraphs = split_into_paragraphs(normalized_text)
paragraphs = [maybe_ruaccent(p) for p in paragraphs]
if not paragraphs:
raise ValueError("No text content to process")
# Process each paragraph
audio_pieces = []
sample_rate = None
for i, paragraph in enumerate(paragraphs):
if not paragraph.strip():
continue
# Check for cancellation before each paragraph
if cancellation_check and cancellation_check():
return ChapterProcessingResult(
chapter_index=chapter.order,
chapter_title=chapter.title,
status="cancelled",
processing_time=time.time() - start_time,
error_message="Processing cancelled by user",
word_count=chapter.word_count,
estimated_duration=chapter.estimated_duration
)
# Synthesize audio for paragraph with enhanced voice parameters
audio, sr = self.tts_backend.synthesize(
text=paragraph,
ref_audio_path=settings.ref_audio_path,
ref_text=settings.ref_text,
speed=settings.speed,
nfe_steps=settings.nfe_steps,
seed=settings.seed,
cross_fade_sec=settings.crossfade_ms / 1000.0,
target_rms=settings.target_rms,
cfg_strength=settings.cfg_strength,
sway_sampling_coef=settings.sway_sampling_coef
)
audio_pieces.append(audio)
sample_rate = sr
# Memory cleanup after each paragraph
if i % 5 == 0: # Every 5 paragraphs
gc.collect()
if not audio_pieces:
raise ValueError("No audio generated")
# Concatenate audio pieces with crossfade
final_audio = crossfade_concat(
audio_pieces,
crossfade_ms=settings.crossfade_ms,
sample_rate=sample_rate
)
# Normalize audio levels
final_audio = normalize_lufs(
final_audio,
sample_rate,
target_lufs=settings.target_lufs
)
processing_time = time.time() - start_time
if progress_callback:
progress_callback(chapter.order, "completed", f"Completed {chapter.title}")
return ChapterProcessingResult(
chapter_index=chapter.order,
chapter_title=chapter.title,
status="completed",
audio_data=final_audio,
sample_rate=sample_rate,
processing_time=processing_time,
word_count=chapter.word_count,
estimated_duration=chapter.estimated_duration
)
except Exception as e:
processing_time = time.time() - start_time
error_msg = f"Error processing {chapter.title}: {str(e)}"
if progress_callback:
progress_callback(chapter.order, "error", error_msg)
return ChapterProcessingResult(
chapter_index=chapter.order,
chapter_title=chapter.title,
status="error",
processing_time=processing_time,
error_message=error_msg,
word_count=chapter.word_count,
estimated_duration=chapter.estimated_duration
)
def process_chapter_with_retry(
self,
chapter: Chapter,
settings: ProcessingSettings,
max_retries: int = 2,
progress_callback: Optional[Callable[[int, str, str], None]] = None,
cancellation_check: Optional[Callable[[], bool]] = None
) -> ChapterProcessingResult:
"""
Process a chapter with retry mechanism for failed attempts.
Args:
chapter: Chapter to process
settings: Processing settings
max_retries: Maximum number of retry attempts
progress_callback: Optional callback for progress updates
Returns:
ChapterProcessingResult with processing status
"""
last_result = None
for attempt in range(max_retries + 1):
if attempt > 0:
if progress_callback:
progress_callback(chapter.order, "processing", f"Retry {attempt} for {chapter.title}")
time.sleep(1) # Brief pause before retry
result = self.process_chapter(chapter, settings, progress_callback, cancellation_check)
last_result = result
if result.status == "completed":
return result
# All retries failed
if progress_callback:
progress_callback(chapter.order, "error", f"Failed after {max_retries} retries")
return last_result
def process_book(
self,
chapters: List[Chapter],
settings: ProcessingSettings,
progress_callback: Optional[Callable[[int, str, str], None]] = None,
max_retries: int = 2,
cancellation_check: Optional[Callable[[], bool]] = None
) -> List[ChapterProcessingResult]:
"""
Process an entire book with multiple chapters.
Args:
chapters: List of chapters to process
settings: Processing settings
progress_callback: Optional callback for progress updates
max_retries: Maximum retries per chapter
Returns:
List of ChapterProcessingResult objects
"""
results = []
for chapter in chapters:
# Check for cancellation before each chapter
if cancellation_check and cancellation_check():
break
# Update chapter status
self.epub_processor.update_chapter_status(chapters, chapter.order, "processing")
# Process chapter with retry
result = self.process_chapter_with_retry(
chapter,
settings,
max_retries,
progress_callback,
cancellation_check
)
# Update chapter status based on result
if result.status == "completed":
self.epub_processor.update_chapter_status(chapters, chapter.order, "completed")
else:
self.epub_processor.update_chapter_status(
chapters,
chapter.order,
"error",
result.error_message
)
results.append(result)
# Memory cleanup after each chapter
gc.collect()
return results
def concatenate_chapter_audio(self, results: List[ChapterProcessingResult]) -> Optional[np.ndarray]:
"""
Concatenate audio from multiple chapter results.
Args:
results: List of ChapterProcessingResult objects
Returns:
Concatenated audio array or None if no valid audio
"""
valid_audio = []
sample_rate = None
for result in results:
if result.status == "completed" and result.audio_data is not None:
valid_audio.append(result.audio_data)
if sample_rate is None:
sample_rate = result.sample_rate
if not valid_audio:
return None
# Concatenate with crossfade
return crossfade_concat(valid_audio, crossfade_ms=150, sample_rate=sample_rate)
def export_audiobook(
self,
results: List[ChapterProcessingResult],
output_path: str,
export_individual_chapters: bool = True
) -> Dict[str, str]:
"""
Export processed audiobook to files.
Args:
results: List of ChapterProcessingResult objects
output_path: Base path for output files
export_individual_chapters: Whether to export individual chapter files
Returns:
Dictionary with paths to exported files
"""
output_dir = Path(output_path).parent
output_dir.mkdir(parents=True, exist_ok=True)
exported_files = {}
sample_rate = None
# Export individual chapters
if export_individual_chapters:
for result in results:
if result.status == "completed" and result.audio_data is not None:
chapter_filename = f"chapter_{result.chapter_index:03d}_{result.chapter_title.replace(' ', '_')}.wav"
chapter_path = output_dir / chapter_filename
save_wav(str(chapter_path), result.audio_data, result.sample_rate)
exported_files[f"chapter_{result.chapter_index}"] = str(chapter_path)
if sample_rate is None:
sample_rate = result.sample_rate
# Export complete audiobook
concatenated_audio = self.concatenate_chapter_audio(results)
if concatenated_audio is not None:
# Use the exact output path specified by the user
complete_path = Path(output_path)
save_wav(str(complete_path), concatenated_audio, sample_rate)
exported_files["complete"] = str(complete_path)
return exported_files
def get_processing_statistics(self, results: List[ChapterProcessingResult]) -> ProcessingStatistics:
"""
Get processing statistics from results.
Args:
results: List of ChapterProcessingResult objects
Returns:
ProcessingStatistics object
"""
total_chapters = len(results)
completed_chapters = sum(1 for r in results if r.status == "completed")
failed_chapters = sum(1 for r in results if r.status == "error")
total_processing_time = sum(r.processing_time for r in results)
total_word_count = sum(r.word_count for r in results)
total_audio_duration = sum(r.estimated_duration for r in results)
average_processing_time = total_processing_time / total_chapters if total_chapters > 0 else 0.0
return ProcessingStatistics(
total_chapters=total_chapters,
completed_chapters=completed_chapters,
failed_chapters=failed_chapters,
total_processing_time=total_processing_time,
average_processing_time=average_processing_time,
total_audio_duration=total_audio_duration,
total_word_count=total_word_count
)
def cleanup(self):
"""Clean up resources."""
self.epub_processor.cleanup_temp_files()
gc.collect()