Ruhivig65's picture
Upload 4 files
e29427f verified
"""
============================================
CRUD Operations (Create, Read, Update, Delete)
All database operations go through here.
Includes comprehensive error handling.
============================================
"""
import logging
from typing import Optional, List
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, update, func, delete
from sqlalchemy.exc import (
IntegrityError,
OperationalError,
DisconnectionError,
TimeoutError as SATimeoutError,
)
from app.database.models import Novel, Chapter, NovelStatus
logger = logging.getLogger(__name__)
# ============================================
# Retry Decorator for DB Operations
# ============================================
def handle_db_errors(func):
"""
Decorator to handle common database errors gracefully.
Catches connection drops, timeouts, and retries logic.
"""
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except DisconnectionError as e:
logger.error(f"Database disconnected during {func.__name__}: {e}")
logger.info("This usually happens when Render DB wakes from sleep. Retrying...")
raise
except OperationalError as e:
logger.error(f"Database operational error in {func.__name__}: {e}")
raise
except SATimeoutError as e:
logger.error(f"Database timeout in {func.__name__}: {e}")
raise
except IntegrityError as e:
logger.warning(f"Integrity error in {func.__name__}: {e}")
raise
except Exception as e:
logger.error(f"Unexpected DB error in {func.__name__}: {e}")
raise
return wrapper
# ============================================
# Novel CRUD
# ============================================
@handle_db_errors
async def create_novel(
db: AsyncSession,
title: str,
url: str,
login_email: Optional[str] = None,
login_password: Optional[str] = None,
next_button_selector: Optional[str] = None,
content_selector: Optional[str] = None,
) -> Novel:
"""Create a new novel entry in the database."""
novel = Novel(
title=title,
url=url,
current_url=url,
login_email=login_email,
login_password=login_password,
status=NovelStatus.QUEUED,
)
if next_button_selector:
novel.next_button_selector = next_button_selector
if content_selector:
novel.content_selector = content_selector
db.add(novel)
await db.flush() # Get the ID without committing
await db.refresh(novel)
logger.info(f"Created novel: {novel.title} (ID: {novel.id})")
return novel
@handle_db_errors
async def get_novel_by_id(db: AsyncSession, novel_id: int) -> Optional[Novel]:
"""Get a single novel by its ID."""
result = await db.execute(
select(Novel).where(Novel.id == novel_id)
)
return result.scalar_one_or_none()
@handle_db_errors
async def get_all_novels(db: AsyncSession) -> List[Novel]:
"""Get all novels ordered by creation date."""
result = await db.execute(
select(Novel).order_by(Novel.created_at.desc())
)
return list(result.scalars().all())
@handle_db_errors
async def get_novels_needing_intervention(db: AsyncSession) -> List[Novel]:
"""Get novels that are paused and need manual captcha solving."""
result = await db.execute(
select(Novel).where(
Novel.needs_intervention == True,
Novel.status == NovelStatus.PAUSED_CAPTCHA,
)
)
return list(result.scalars().all())
@handle_db_errors
async def update_novel_status(
db: AsyncSession,
novel_id: int,
status: NovelStatus,
error_message: Optional[str] = None,
current_url: Optional[str] = None,
screenshot_path: Optional[str] = None,
needs_intervention: Optional[bool] = None,
) -> Optional[Novel]:
"""Update the status and related fields of a novel."""
update_data = {"status": status}
if error_message is not None:
update_data["last_error"] = error_message
if current_url is not None:
update_data["current_url"] = current_url
if screenshot_path is not None:
update_data["screenshot_path"] = screenshot_path
if needs_intervention is not None:
update_data["needs_intervention"] = needs_intervention
await db.execute(
update(Novel)
.where(Novel.id == novel_id)
.values(**update_data)
)
await db.flush()
# Return updated novel
return await get_novel_by_id(db, novel_id)
@handle_db_errors
async def increment_chapter_count(db: AsyncSession, novel_id: int) -> None:
"""Increment the chapters_scraped counter for a novel."""
await db.execute(
update(Novel)
.where(Novel.id == novel_id)
.values(chapters_scraped=Novel.chapters_scraped + 1)
)
await db.flush()
@handle_db_errors
async def delete_novel(db: AsyncSession, novel_id: int) -> bool:
"""Delete a novel and all its chapters (cascade)."""
result = await db.execute(
delete(Novel).where(Novel.id == novel_id)
)
await db.flush()
deleted = result.rowcount > 0
if deleted:
logger.info(f"Deleted novel ID: {novel_id}")
return deleted
# ============================================
# Chapter CRUD
# ============================================
@handle_db_errors
async def save_chapter(
db: AsyncSession,
novel_id: int,
chapter_number: int,
content: str,
title: Optional[str] = None,
url: Optional[str] = None,
content_hindi: Optional[str] = None,
title_hindi: Optional[str] = None,
) -> Chapter:
"""
Save a chapter to the database (English + Hindi).
If a chapter with the same number already exists for this novel,
it will raise an IntegrityError (caught by caller).
"""
word_count = len(content.split()) if content else 0
chapter = Chapter(
novel_id=novel_id,
chapter_number=chapter_number,
title=title or f"Chapter {chapter_number}",
title_hindi=title_hindi,
content=content,
content_hindi=content_hindi,
url=url,
word_count=word_count,
)
db.add(chapter)
await db.flush()
await db.refresh(chapter)
logger.info(
f"Saved Chapter {chapter_number} for Novel {novel_id} "
f"({word_count} words)"
)
return chapter
@handle_db_errors
async def get_chapters_for_novel(
db: AsyncSession,
novel_id: int,
) -> List[Chapter]:
"""Get all chapters for a novel, ordered by chapter number."""
result = await db.execute(
select(Chapter)
.where(Chapter.novel_id == novel_id)
.order_by(Chapter.chapter_number.asc())
)
return list(result.scalars().all())
@handle_db_errors
async def get_chapter_count(db: AsyncSession, novel_id: int) -> int:
"""Get the total number of chapters saved for a novel."""
result = await db.execute(
select(func.count(Chapter.id))
.where(Chapter.novel_id == novel_id)
)
return result.scalar() or 0
@handle_db_errors
async def get_last_chapter_number(db: AsyncSession, novel_id: int) -> int:
"""Get the highest chapter number saved for a novel."""
result = await db.execute(
select(func.max(Chapter.chapter_number))
.where(Chapter.novel_id == novel_id)
)
return result.scalar() or 0
@handle_db_errors
async def chapter_exists(
db: AsyncSession,
novel_id: int,
chapter_number: int,
) -> bool:
"""Check if a specific chapter already exists."""
result = await db.execute(
select(func.count(Chapter.id))
.where(
Chapter.novel_id == novel_id,
Chapter.chapter_number == chapter_number,
)
)
return (result.scalar() or 0) > 0
@handle_db_errors
async def get_total_word_count(db: AsyncSession, novel_id: int) -> int:
"""Get total word count across all chapters of a novel."""
result = await db.execute(
select(func.sum(Chapter.word_count))
.where(Chapter.novel_id == novel_id)
)
return result.scalar() or 0