| """ |
| ============================================ |
| CRUD Operations (Create, Read, Update, Delete) |
| All database operations go through here. |
| Includes comprehensive error handling. |
| ============================================ |
| """ |
|
|
| import logging |
| from typing import Optional, List |
| from sqlalchemy.ext.asyncio import AsyncSession |
| from sqlalchemy import select, update, func, delete |
| from sqlalchemy.exc import ( |
| IntegrityError, |
| OperationalError, |
| DisconnectionError, |
| TimeoutError as SATimeoutError, |
| ) |
|
|
| from app.database.models import Novel, Chapter, NovelStatus |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| |
| |
| def handle_db_errors(func): |
| """ |
| Decorator to handle common database errors gracefully. |
| Catches connection drops, timeouts, and retries logic. |
| """ |
| async def wrapper(*args, **kwargs): |
| try: |
| return await func(*args, **kwargs) |
| except DisconnectionError as e: |
| logger.error(f"Database disconnected during {func.__name__}: {e}") |
| logger.info("This usually happens when Render DB wakes from sleep. Retrying...") |
| raise |
| except OperationalError as e: |
| logger.error(f"Database operational error in {func.__name__}: {e}") |
| raise |
| except SATimeoutError as e: |
| logger.error(f"Database timeout in {func.__name__}: {e}") |
| raise |
| except IntegrityError as e: |
| logger.warning(f"Integrity error in {func.__name__}: {e}") |
| raise |
| except Exception as e: |
| logger.error(f"Unexpected DB error in {func.__name__}: {e}") |
| raise |
| return wrapper |
|
|
|
|
| |
| |
| |
| @handle_db_errors |
| async def create_novel( |
| db: AsyncSession, |
| title: str, |
| url: str, |
| login_email: Optional[str] = None, |
| login_password: Optional[str] = None, |
| next_button_selector: Optional[str] = None, |
| content_selector: Optional[str] = None, |
| ) -> Novel: |
| """Create a new novel entry in the database.""" |
| novel = Novel( |
| title=title, |
| url=url, |
| current_url=url, |
| login_email=login_email, |
| login_password=login_password, |
| status=NovelStatus.QUEUED, |
| ) |
| |
| if next_button_selector: |
| novel.next_button_selector = next_button_selector |
| if content_selector: |
| novel.content_selector = content_selector |
|
|
| db.add(novel) |
| await db.flush() |
| await db.refresh(novel) |
| |
| logger.info(f"Created novel: {novel.title} (ID: {novel.id})") |
| return novel |
|
|
|
|
| @handle_db_errors |
| async def get_novel_by_id(db: AsyncSession, novel_id: int) -> Optional[Novel]: |
| """Get a single novel by its ID.""" |
| result = await db.execute( |
| select(Novel).where(Novel.id == novel_id) |
| ) |
| return result.scalar_one_or_none() |
|
|
|
|
| @handle_db_errors |
| async def get_all_novels(db: AsyncSession) -> List[Novel]: |
| """Get all novels ordered by creation date.""" |
| result = await db.execute( |
| select(Novel).order_by(Novel.created_at.desc()) |
| ) |
| return list(result.scalars().all()) |
|
|
|
|
| @handle_db_errors |
| async def get_novels_needing_intervention(db: AsyncSession) -> List[Novel]: |
| """Get novels that are paused and need manual captcha solving.""" |
| result = await db.execute( |
| select(Novel).where( |
| Novel.needs_intervention == True, |
| Novel.status == NovelStatus.PAUSED_CAPTCHA, |
| ) |
| ) |
| return list(result.scalars().all()) |
|
|
|
|
| @handle_db_errors |
| async def update_novel_status( |
| db: AsyncSession, |
| novel_id: int, |
| status: NovelStatus, |
| error_message: Optional[str] = None, |
| current_url: Optional[str] = None, |
| screenshot_path: Optional[str] = None, |
| needs_intervention: Optional[bool] = None, |
| ) -> Optional[Novel]: |
| """Update the status and related fields of a novel.""" |
| update_data = {"status": status} |
| |
| if error_message is not None: |
| update_data["last_error"] = error_message |
| if current_url is not None: |
| update_data["current_url"] = current_url |
| if screenshot_path is not None: |
| update_data["screenshot_path"] = screenshot_path |
| if needs_intervention is not None: |
| update_data["needs_intervention"] = needs_intervention |
|
|
| await db.execute( |
| update(Novel) |
| .where(Novel.id == novel_id) |
| .values(**update_data) |
| ) |
| await db.flush() |
|
|
| |
| return await get_novel_by_id(db, novel_id) |
|
|
|
|
| @handle_db_errors |
| async def increment_chapter_count(db: AsyncSession, novel_id: int) -> None: |
| """Increment the chapters_scraped counter for a novel.""" |
| await db.execute( |
| update(Novel) |
| .where(Novel.id == novel_id) |
| .values(chapters_scraped=Novel.chapters_scraped + 1) |
| ) |
| await db.flush() |
|
|
|
|
| @handle_db_errors |
| async def delete_novel(db: AsyncSession, novel_id: int) -> bool: |
| """Delete a novel and all its chapters (cascade).""" |
| result = await db.execute( |
| delete(Novel).where(Novel.id == novel_id) |
| ) |
| await db.flush() |
| deleted = result.rowcount > 0 |
| if deleted: |
| logger.info(f"Deleted novel ID: {novel_id}") |
| return deleted |
|
|
|
|
| |
| |
| |
| @handle_db_errors |
| async def save_chapter( |
| db: AsyncSession, |
| novel_id: int, |
| chapter_number: int, |
| content: str, |
| title: Optional[str] = None, |
| url: Optional[str] = None, |
| content_hindi: Optional[str] = None, |
| title_hindi: Optional[str] = None, |
| ) -> Chapter: |
| """ |
| Save a chapter to the database (English + Hindi). |
| If a chapter with the same number already exists for this novel, |
| it will raise an IntegrityError (caught by caller). |
| """ |
| word_count = len(content.split()) if content else 0 |
|
|
| chapter = Chapter( |
| novel_id=novel_id, |
| chapter_number=chapter_number, |
| title=title or f"Chapter {chapter_number}", |
| title_hindi=title_hindi, |
| content=content, |
| content_hindi=content_hindi, |
| url=url, |
| word_count=word_count, |
| ) |
|
|
| db.add(chapter) |
| await db.flush() |
| await db.refresh(chapter) |
|
|
| logger.info( |
| f"Saved Chapter {chapter_number} for Novel {novel_id} " |
| f"({word_count} words)" |
| ) |
| return chapter |
|
|
|
|
| @handle_db_errors |
| async def get_chapters_for_novel( |
| db: AsyncSession, |
| novel_id: int, |
| ) -> List[Chapter]: |
| """Get all chapters for a novel, ordered by chapter number.""" |
| result = await db.execute( |
| select(Chapter) |
| .where(Chapter.novel_id == novel_id) |
| .order_by(Chapter.chapter_number.asc()) |
| ) |
| return list(result.scalars().all()) |
|
|
|
|
| @handle_db_errors |
| async def get_chapter_count(db: AsyncSession, novel_id: int) -> int: |
| """Get the total number of chapters saved for a novel.""" |
| result = await db.execute( |
| select(func.count(Chapter.id)) |
| .where(Chapter.novel_id == novel_id) |
| ) |
| return result.scalar() or 0 |
|
|
|
|
| @handle_db_errors |
| async def get_last_chapter_number(db: AsyncSession, novel_id: int) -> int: |
| """Get the highest chapter number saved for a novel.""" |
| result = await db.execute( |
| select(func.max(Chapter.chapter_number)) |
| .where(Chapter.novel_id == novel_id) |
| ) |
| return result.scalar() or 0 |
|
|
|
|
| @handle_db_errors |
| async def chapter_exists( |
| db: AsyncSession, |
| novel_id: int, |
| chapter_number: int, |
| ) -> bool: |
| """Check if a specific chapter already exists.""" |
| result = await db.execute( |
| select(func.count(Chapter.id)) |
| .where( |
| Chapter.novel_id == novel_id, |
| Chapter.chapter_number == chapter_number, |
| ) |
| ) |
| return (result.scalar() or 0) > 0 |
|
|
|
|
| @handle_db_errors |
| async def get_total_word_count(db: AsyncSession, novel_id: int) -> int: |
| """Get total word count across all chapters of a novel.""" |
| result = await db.execute( |
| select(func.sum(Chapter.word_count)) |
| .where(Chapter.novel_id == novel_id) |
| ) |
| return result.scalar() or 0 |