Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """ | |
| EPUB processing module for Russian Audiobook Studio. | |
| Handles EPUB file validation, chapter extraction, and processing coordination. | |
| """ | |
| import os | |
| import tempfile | |
| from typing import List, Optional, Dict, Any | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import zipfile | |
| from ebooklib import epub | |
| from ebooklib.epub import EpubException | |
| class Chapter: | |
| """Represents a chapter in an EPUB book.""" | |
| title: str | |
| content: str | |
| file_name: str | |
| order: int | |
| preview: str # First 100-200 characters for preview | |
| status: str = "pending" # pending, processing, completed, error | |
| word_count: int = 0 | |
| estimated_duration: float = 0.0 # Estimated duration in minutes | |
| error_message: Optional[str] = None | |
| class EpubValidationResult: | |
| """Result of EPUB file validation.""" | |
| is_valid: bool | |
| error_message: Optional[str] | |
| chapters: List[Chapter] | |
| book_title: Optional[str] | |
| book_author: Optional[str] | |
| total_chapters: int | |
| class EpubValidationError(Exception): | |
| """Custom exception for EPUB validation errors.""" | |
| pass | |
| class EpubValidator: | |
| """Validates EPUB files and extracts chapter information.""" | |
| MAX_FILE_SIZE = 500 * 1024 * 1024 # 500MB limit | |
| MIN_PREVIEW_LENGTH = 100 | |
| MAX_PREVIEW_LENGTH = 200 | |
| def __init__(self): | |
| self.supported_extensions = ['.epub'] | |
| def validate_file(self, file_path: str) -> EpubValidationResult: | |
| """ | |
| Validate an EPUB file and extract chapter information. | |
| Args: | |
| file_path: Path to the EPUB file | |
| Returns: | |
| EpubValidationResult with validation status and chapter information | |
| Raises: | |
| EpubValidationError: If validation fails | |
| """ | |
| if not file_path: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message="No file path provided", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| # Check if file exists | |
| if not os.path.exists(file_path): | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message=f"File does not exist: {file_path}", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| # Check file extension | |
| if not self._is_epub_file(file_path): | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message="File is not an EPUB file. Please upload a .epub file.", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| # Check file size | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message="File is empty", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| if file_size > self.MAX_FILE_SIZE: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message=f"File is too large. Maximum size is {self.MAX_FILE_SIZE // (1024*1024)}MB", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| # Try to parse the EPUB | |
| try: | |
| return self._parse_epub(file_path) | |
| except EpubException as e: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message=f"Invalid EPUB file: {str(e)}", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| except Exception as e: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message=f"Error reading EPUB file: {str(e)}", | |
| chapters=[], | |
| book_title=None, | |
| book_author=None, | |
| total_chapters=0 | |
| ) | |
| def _is_epub_file(self, file_path: str) -> bool: | |
| """Check if file has EPUB extension.""" | |
| return Path(file_path).suffix.lower() in self.supported_extensions | |
| def _parse_epub(self, file_path: str) -> EpubValidationResult: | |
| """Parse EPUB file and extract chapter information.""" | |
| try: | |
| book = epub.read_epub(file_path) | |
| # Extract book metadata | |
| book_title = book.get_metadata('DC', 'title') | |
| book_author = book.get_metadata('DC', 'creator') | |
| title = book_title[0][0] if book_title else "Unknown Title" | |
| author = book_author[0][0] if book_author else "Unknown Author" | |
| # Extract chapters | |
| chapters = self._extract_chapters(book) | |
| if not chapters: | |
| return EpubValidationResult( | |
| is_valid=False, | |
| error_message="No readable chapters found in EPUB file", | |
| chapters=[], | |
| book_title=title, | |
| book_author=author, | |
| total_chapters=0 | |
| ) | |
| return EpubValidationResult( | |
| is_valid=True, | |
| error_message=None, | |
| chapters=chapters, | |
| book_title=title, | |
| book_author=author, | |
| total_chapters=len(chapters) | |
| ) | |
| except Exception as e: | |
| raise EpubValidationError(f"Failed to parse EPUB: {str(e)}") | |
| def _extract_chapters(self, book: epub.EpubBook) -> List[Chapter]: | |
| """Extract chapters from EPUB book.""" | |
| chapters = [] | |
| chapter_order = 0 | |
| # Try to get items from spine first (reading order) | |
| spine_items = [] | |
| if hasattr(book, 'spine') and book.spine: | |
| for item_id, linear in book.spine: | |
| if not linear: | |
| continue | |
| item = book.get_item_with_id(item_id) | |
| if item: | |
| spine_items.append(item) | |
| # If no spine items, get all document items | |
| if not spine_items: | |
| spine_items = [item for item in book.get_items() if item.get_type() == 9] # 9 = HTML document type | |
| # Process each item | |
| for item in spine_items: | |
| # Check if item is HTML content | |
| if item.get_type() != 9: # 9 = HTML document type | |
| continue | |
| # Extract text content | |
| content = self._extract_text_content(item) | |
| if not content or len(content.strip()) < 50: # Skip very short chapters | |
| continue | |
| # Create chapter | |
| chapter = Chapter( | |
| title=self._get_chapter_title(item, chapter_order), | |
| content=content, | |
| file_name=item.get_name(), | |
| order=chapter_order, | |
| preview=self._create_preview(content), | |
| word_count=self._count_words(content), | |
| estimated_duration=self._estimate_duration(content) | |
| ) | |
| chapters.append(chapter) | |
| chapter_order += 1 | |
| return chapters | |
| def _extract_text_content(self, item) -> str: | |
| """Extract text content from EPUB item.""" | |
| try: | |
| # Get content and handle different encodings | |
| raw_content = item.get_content() | |
| if isinstance(raw_content, bytes): | |
| # Try different encodings | |
| for encoding in ['utf-8', 'latin-1', 'cp1252']: | |
| try: | |
| content = raw_content.decode(encoding) | |
| break | |
| except UnicodeDecodeError: | |
| continue | |
| else: | |
| # Fallback to utf-8 with errors='ignore' | |
| content = raw_content.decode('utf-8', errors='ignore') | |
| else: | |
| content = str(raw_content) | |
| # Basic HTML tag removal (simple approach) | |
| import re | |
| # Remove HTML tags | |
| content = re.sub(r'<[^>]+>', '', content) | |
| # Clean up whitespace | |
| content = re.sub(r'\s+', ' ', content).strip() | |
| return content | |
| except Exception as e: | |
| print(f"Warning: Could not extract content from {item.get_name()}: {e}") | |
| return "" | |
| def _get_chapter_title(self, item, order: int) -> str: | |
| """Get chapter title from item or generate default.""" | |
| # Try to extract title from content | |
| try: | |
| raw_content = item.get_content() | |
| if isinstance(raw_content, bytes): | |
| content = raw_content.decode('utf-8', errors='ignore') | |
| else: | |
| content = str(raw_content) | |
| import re | |
| # Look for h1, h2, h3 tags | |
| title_match = re.search(r'<h[1-3][^>]*>([^<]+)</h[1-3]>', content, re.IGNORECASE) | |
| if title_match: | |
| title = title_match.group(1).strip() | |
| # Clean up the title | |
| title = re.sub(r'<[^>]+>', '', title) # Remove any remaining HTML tags | |
| title = re.sub(r'\s+', ' ', title).strip() # Clean whitespace | |
| if title: | |
| return title | |
| except Exception: | |
| pass | |
| # Try to get title from item metadata | |
| try: | |
| if hasattr(item, 'title') and item.title: | |
| return str(item.title) | |
| except Exception: | |
| pass | |
| # Fallback to file name or default | |
| file_name = item.get_name() | |
| if file_name: | |
| # Clean up file name to make it more readable | |
| clean_name = Path(file_name).stem | |
| clean_name = clean_name.replace('_', ' ').replace('-', ' ') | |
| clean_name = re.sub(r'\d+', '', clean_name) # Remove numbers | |
| clean_name = clean_name.strip() | |
| if clean_name: | |
| return clean_name.title() | |
| return f"Chapter {order + 1}" | |
| def _create_preview(self, content: str) -> str: | |
| """Create preview text from chapter content.""" | |
| if not content: | |
| return "" | |
| # Clean content for preview | |
| preview = content.strip() | |
| # Truncate to reasonable length | |
| if len(preview) > self.MAX_PREVIEW_LENGTH: | |
| preview = preview[:self.MAX_PREVIEW_LENGTH] | |
| # Try to end at a sentence boundary | |
| last_period = preview.rfind('.') | |
| if last_period > self.MIN_PREVIEW_LENGTH: | |
| preview = preview[:last_period + 1] | |
| else: | |
| preview = preview + "..." | |
| return preview | |
| def _count_words(self, content: str) -> int: | |
| """Count words in content.""" | |
| if not content: | |
| return 0 | |
| # Simple word counting - split by whitespace and filter empty strings | |
| words = [word for word in content.split() if word.strip()] | |
| return len(words) | |
| def _estimate_duration(self, content: str) -> float: | |
| """Estimate audio duration in minutes based on content length.""" | |
| if not content: | |
| return 0.0 | |
| # Estimate based on average reading speed | |
| # Russian text: ~150-200 words per minute for speech synthesis | |
| # We'll use 180 words per minute as a reasonable estimate | |
| word_count = self._count_words(content) | |
| duration_minutes = word_count / 180.0 | |
| # Add some buffer for processing time | |
| return round(duration_minutes * 1.1, 1) | |
| class EpubProcessor: | |
| """Main EPUB processor for handling EPUB files in the web interface.""" | |
| def __init__(self): | |
| self.validator = EpubValidator() | |
| self.temp_dir = tempfile.mkdtemp(prefix="epub_processing_") | |
| def process_epub_upload(self, file_path: str) -> EpubValidationResult: | |
| """ | |
| Process an uploaded EPUB file. | |
| Args: | |
| file_path: Path to uploaded EPUB file | |
| Returns: | |
| EpubValidationResult with validation status and chapter information | |
| """ | |
| return self.validator.validate_file(file_path) | |
| def update_chapter_status(self, chapters: List[Chapter], chapter_index: int, status: str, error_message: Optional[str] = None): | |
| """Update the status of a specific chapter.""" | |
| if 0 <= chapter_index < len(chapters): | |
| chapters[chapter_index].status = status | |
| if error_message: | |
| chapters[chapter_index].error_message = error_message | |
| def get_chapter_status_summary(self, chapters: List[Chapter]) -> Dict[str, int]: | |
| """Get a summary of chapter statuses.""" | |
| summary = {"pending": 0, "processing": 0, "completed": 0, "error": 0} | |
| for chapter in chapters: | |
| if chapter.status in summary: | |
| summary[chapter.status] += 1 | |
| return summary | |
| def get_total_estimated_duration(self, chapters: List[Chapter]) -> float: | |
| """Get total estimated duration for all chapters.""" | |
| return sum(chapter.estimated_duration for chapter in chapters) | |
| def get_total_word_count(self, chapters: List[Chapter]) -> int: | |
| """Get total word count for all chapters.""" | |
| return sum(chapter.word_count for chapter in chapters) | |
| def cleanup_temp_files(self): | |
| """Clean up temporary files.""" | |
| import shutil | |
| try: | |
| shutil.rmtree(self.temp_dir, ignore_errors=True) | |
| except Exception: | |
| pass | |
| def __del__(self): | |
| """Cleanup on destruction.""" | |
| self.cleanup_temp_files() | |