audiobook-ru-tts / epub_processor.py
danilahs's picture
Upload folder using huggingface_hub
4f6648e verified
#!/usr/bin/env python3
"""
EPUB processing module for Russian Audiobook Studio.
Handles EPUB file validation, chapter extraction, and processing coordination.
"""
import os
import tempfile
from typing import List, Optional, Dict, Any
from dataclasses import dataclass
from pathlib import Path
import zipfile
from ebooklib import epub
from ebooklib.epub import EpubException
@dataclass
class Chapter:
"""Represents a chapter in an EPUB book."""
title: str
content: str
file_name: str
order: int
preview: str # First 100-200 characters for preview
status: str = "pending" # pending, processing, completed, error
word_count: int = 0
estimated_duration: float = 0.0 # Estimated duration in minutes
error_message: Optional[str] = None
@dataclass
class EpubValidationResult:
"""Result of EPUB file validation."""
is_valid: bool
error_message: Optional[str]
chapters: List[Chapter]
book_title: Optional[str]
book_author: Optional[str]
total_chapters: int
class EpubValidationError(Exception):
"""Custom exception for EPUB validation errors."""
pass
class EpubValidator:
"""Validates EPUB files and extracts chapter information."""
MAX_FILE_SIZE = 500 * 1024 * 1024 # 500MB limit
MIN_PREVIEW_LENGTH = 100
MAX_PREVIEW_LENGTH = 200
def __init__(self):
self.supported_extensions = ['.epub']
def validate_file(self, file_path: str) -> EpubValidationResult:
"""
Validate an EPUB file and extract chapter information.
Args:
file_path: Path to the EPUB file
Returns:
EpubValidationResult with validation status and chapter information
Raises:
EpubValidationError: If validation fails
"""
if not file_path:
return EpubValidationResult(
is_valid=False,
error_message="No file path provided",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
# Check if file exists
if not os.path.exists(file_path):
return EpubValidationResult(
is_valid=False,
error_message=f"File does not exist: {file_path}",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
# Check file extension
if not self._is_epub_file(file_path):
return EpubValidationResult(
is_valid=False,
error_message="File is not an EPUB file. Please upload a .epub file.",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
# Check file size
file_size = os.path.getsize(file_path)
if file_size == 0:
return EpubValidationResult(
is_valid=False,
error_message="File is empty",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
if file_size > self.MAX_FILE_SIZE:
return EpubValidationResult(
is_valid=False,
error_message=f"File is too large. Maximum size is {self.MAX_FILE_SIZE // (1024*1024)}MB",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
# Try to parse the EPUB
try:
return self._parse_epub(file_path)
except EpubException as e:
return EpubValidationResult(
is_valid=False,
error_message=f"Invalid EPUB file: {str(e)}",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
except Exception as e:
return EpubValidationResult(
is_valid=False,
error_message=f"Error reading EPUB file: {str(e)}",
chapters=[],
book_title=None,
book_author=None,
total_chapters=0
)
def _is_epub_file(self, file_path: str) -> bool:
"""Check if file has EPUB extension."""
return Path(file_path).suffix.lower() in self.supported_extensions
def _parse_epub(self, file_path: str) -> EpubValidationResult:
"""Parse EPUB file and extract chapter information."""
try:
book = epub.read_epub(file_path)
# Extract book metadata
book_title = book.get_metadata('DC', 'title')
book_author = book.get_metadata('DC', 'creator')
title = book_title[0][0] if book_title else "Unknown Title"
author = book_author[0][0] if book_author else "Unknown Author"
# Extract chapters
chapters = self._extract_chapters(book)
if not chapters:
return EpubValidationResult(
is_valid=False,
error_message="No readable chapters found in EPUB file",
chapters=[],
book_title=title,
book_author=author,
total_chapters=0
)
return EpubValidationResult(
is_valid=True,
error_message=None,
chapters=chapters,
book_title=title,
book_author=author,
total_chapters=len(chapters)
)
except Exception as e:
raise EpubValidationError(f"Failed to parse EPUB: {str(e)}")
def _extract_chapters(self, book: epub.EpubBook) -> List[Chapter]:
"""Extract chapters from EPUB book."""
chapters = []
chapter_order = 0
# Try to get items from spine first (reading order)
spine_items = []
if hasattr(book, 'spine') and book.spine:
for item_id, linear in book.spine:
if not linear:
continue
item = book.get_item_with_id(item_id)
if item:
spine_items.append(item)
# If no spine items, get all document items
if not spine_items:
spine_items = [item for item in book.get_items() if item.get_type() == 9] # 9 = HTML document type
# Process each item
for item in spine_items:
# Check if item is HTML content
if item.get_type() != 9: # 9 = HTML document type
continue
# Extract text content
content = self._extract_text_content(item)
if not content or len(content.strip()) < 50: # Skip very short chapters
continue
# Create chapter
chapter = Chapter(
title=self._get_chapter_title(item, chapter_order),
content=content,
file_name=item.get_name(),
order=chapter_order,
preview=self._create_preview(content),
word_count=self._count_words(content),
estimated_duration=self._estimate_duration(content)
)
chapters.append(chapter)
chapter_order += 1
return chapters
def _extract_text_content(self, item) -> str:
"""Extract text content from EPUB item."""
try:
# Get content and handle different encodings
raw_content = item.get_content()
if isinstance(raw_content, bytes):
# Try different encodings
for encoding in ['utf-8', 'latin-1', 'cp1252']:
try:
content = raw_content.decode(encoding)
break
except UnicodeDecodeError:
continue
else:
# Fallback to utf-8 with errors='ignore'
content = raw_content.decode('utf-8', errors='ignore')
else:
content = str(raw_content)
# Basic HTML tag removal (simple approach)
import re
# Remove HTML tags
content = re.sub(r'<[^>]+>', '', content)
# Clean up whitespace
content = re.sub(r'\s+', ' ', content).strip()
return content
except Exception as e:
print(f"Warning: Could not extract content from {item.get_name()}: {e}")
return ""
def _get_chapter_title(self, item, order: int) -> str:
"""Get chapter title from item or generate default."""
# Try to extract title from content
try:
raw_content = item.get_content()
if isinstance(raw_content, bytes):
content = raw_content.decode('utf-8', errors='ignore')
else:
content = str(raw_content)
import re
# Look for h1, h2, h3 tags
title_match = re.search(r'<h[1-3][^>]*>([^<]+)</h[1-3]>', content, re.IGNORECASE)
if title_match:
title = title_match.group(1).strip()
# Clean up the title
title = re.sub(r'<[^>]+>', '', title) # Remove any remaining HTML tags
title = re.sub(r'\s+', ' ', title).strip() # Clean whitespace
if title:
return title
except Exception:
pass
# Try to get title from item metadata
try:
if hasattr(item, 'title') and item.title:
return str(item.title)
except Exception:
pass
# Fallback to file name or default
file_name = item.get_name()
if file_name:
# Clean up file name to make it more readable
clean_name = Path(file_name).stem
clean_name = clean_name.replace('_', ' ').replace('-', ' ')
clean_name = re.sub(r'\d+', '', clean_name) # Remove numbers
clean_name = clean_name.strip()
if clean_name:
return clean_name.title()
return f"Chapter {order + 1}"
def _create_preview(self, content: str) -> str:
"""Create preview text from chapter content."""
if not content:
return ""
# Clean content for preview
preview = content.strip()
# Truncate to reasonable length
if len(preview) > self.MAX_PREVIEW_LENGTH:
preview = preview[:self.MAX_PREVIEW_LENGTH]
# Try to end at a sentence boundary
last_period = preview.rfind('.')
if last_period > self.MIN_PREVIEW_LENGTH:
preview = preview[:last_period + 1]
else:
preview = preview + "..."
return preview
def _count_words(self, content: str) -> int:
"""Count words in content."""
if not content:
return 0
# Simple word counting - split by whitespace and filter empty strings
words = [word for word in content.split() if word.strip()]
return len(words)
def _estimate_duration(self, content: str) -> float:
"""Estimate audio duration in minutes based on content length."""
if not content:
return 0.0
# Estimate based on average reading speed
# Russian text: ~150-200 words per minute for speech synthesis
# We'll use 180 words per minute as a reasonable estimate
word_count = self._count_words(content)
duration_minutes = word_count / 180.0
# Add some buffer for processing time
return round(duration_minutes * 1.1, 1)
class EpubProcessor:
"""Main EPUB processor for handling EPUB files in the web interface."""
def __init__(self):
self.validator = EpubValidator()
self.temp_dir = tempfile.mkdtemp(prefix="epub_processing_")
def process_epub_upload(self, file_path: str) -> EpubValidationResult:
"""
Process an uploaded EPUB file.
Args:
file_path: Path to uploaded EPUB file
Returns:
EpubValidationResult with validation status and chapter information
"""
return self.validator.validate_file(file_path)
def update_chapter_status(self, chapters: List[Chapter], chapter_index: int, status: str, error_message: Optional[str] = None):
"""Update the status of a specific chapter."""
if 0 <= chapter_index < len(chapters):
chapters[chapter_index].status = status
if error_message:
chapters[chapter_index].error_message = error_message
def get_chapter_status_summary(self, chapters: List[Chapter]) -> Dict[str, int]:
"""Get a summary of chapter statuses."""
summary = {"pending": 0, "processing": 0, "completed": 0, "error": 0}
for chapter in chapters:
if chapter.status in summary:
summary[chapter.status] += 1
return summary
def get_total_estimated_duration(self, chapters: List[Chapter]) -> float:
"""Get total estimated duration for all chapters."""
return sum(chapter.estimated_duration for chapter in chapters)
def get_total_word_count(self, chapters: List[Chapter]) -> int:
"""Get total word count for all chapters."""
return sum(chapter.word_count for chapter in chapters)
def cleanup_temp_files(self):
"""Clean up temporary files."""
import shutil
try:
shutil.rmtree(self.temp_dir, ignore_errors=True)
except Exception:
pass
def __del__(self):
"""Cleanup on destruction."""
self.cleanup_temp_files()