""" database.py - PostgreSQL database models and CRUD operations. Handles glossary storage, novel tracking, and chapter management. """ import json import logging from datetime import datetime from typing import List, Dict, Optional, Tuple from contextlib import contextmanager from sqlalchemy import ( create_engine, Column, Integer, String, Text, DateTime, Boolean, ForeignKey, UniqueConstraint, Index, Enum as SAEnum, event, text as sa_text ) from sqlalchemy.orm import ( declarative_base, sessionmaker, relationship, Session ) from sqlalchemy.pool import QueuePool from sqlalchemy.exc import IntegrityError, OperationalError from config import DATABASE_URL logger = logging.getLogger(__name__) # ─── Engine & Session Setup ───────────────────────────────────────────────────── engine = create_engine( DATABASE_URL, poolclass=QueuePool, pool_size=5, max_overflow=10, pool_timeout=30, pool_recycle=1800, pool_pre_ping=True, echo=False ) # Added expire_on_commit=False to prevent DetachedInstanceError when accessing object attributes outside the session SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) Base = declarative_base() # ─── Models ───────────────────────────────────────────────────────────────────── class Novel(Base): """Tracks each uploaded novel.""" __tablename__ = "novels" id = Column(Integer, primary_key=True, autoincrement=True) title = Column(String(500), nullable=False) original_filename = Column(String(500), nullable=False) mc_original_name = Column(String(200), nullable=True) mc_indian_name = Column(String(200), nullable=True) total_chapters = Column(Integer, default=0) processed_chapters = Column(Integer, default=0) status = Column( String(50), default="uploaded" ) # uploaded, processing, completed, error selected_model = Column(String(200), nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) # Relationships glossary_entries = relationship( "GlossaryEntry", back_populates="novel", cascade="all, delete-orphan" ) chapters = relationship( "Chapter", back_populates="novel", cascade="all, delete-orphan" ) output_files = relationship( "OutputFile", back_populates="novel", cascade="all, delete-orphan" ) def __repr__(self): return f"" class GlossaryEntry(Base): """Stores name/term mappings for consistency.""" __tablename__ = "glossary_entries" id = Column(Integer, primary_key=True, autoincrement=True) novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) original_name = Column(String(500), nullable=False) indian_name = Column(String(500), nullable=False) entry_type = Column( String(50), default="character" ) # character, place, sect, item, technique, other first_seen_chapter = Column(Integer, default=1) usage_count = Column(Integer, default=1) created_at = Column(DateTime, default=datetime.utcnow) # Relationships novel = relationship("Novel", back_populates="glossary_entries") __table_args__ = ( UniqueConstraint("novel_id", "original_name", name="uq_novel_original_name"), Index("idx_novel_glossary", "novel_id"), Index("idx_original_name", "original_name"), ) def __repr__(self): return ( f"" ) class Chapter(Base): """Tracks individual chapters and their processing status.""" __tablename__ = "chapters" id = Column(Integer, primary_key=True, autoincrement=True) novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) chapter_number = Column(Integer, nullable=False) original_title = Column(String(1000), nullable=True) original_text = Column(Text, nullable=False) translated_text = Column(Text, nullable=True) is_processed = Column(Boolean, default=False) error_message = Column(Text, nullable=True) retry_count = Column(Integer, default=0) created_at = Column(DateTime, default=datetime.utcnow) processed_at = Column(DateTime, nullable=True) # Relationships novel = relationship("Novel", back_populates="chapters") __table_args__ = ( UniqueConstraint("novel_id", "chapter_number", name="uq_novel_chapter"), Index("idx_novel_chapter", "novel_id", "chapter_number"), ) def __repr__(self): return ( f"" ) class OutputFile(Base): """Tracks generated output files (every 20 chapters).""" __tablename__ = "output_files" id = Column(Integer, primary_key=True, autoincrement=True) novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) filename = Column(String(500), nullable=False) chapter_start = Column(Integer, nullable=False) chapter_end = Column(Integer, nullable=False) file_content = Column(Text, nullable=False) created_at = Column(DateTime, default=datetime.utcnow) # Relationships novel = relationship("Novel", back_populates="output_files") __table_args__ = ( Index("idx_novel_output", "novel_id"), ) # ─── Database Initialization ──────────────────────────────────────────────────── def init_database(): """Create all tables if they don't exist.""" try: Base.metadata.create_all(bind=engine) logger.info("Database tables created/verified successfully.") except Exception as e: logger.error(f"Failed to initialize database: {e}") raise @contextmanager def get_db_session() -> Session: """Context manager for database sessions with proper cleanup.""" session = SessionLocal() try: yield session session.commit() except Exception: session.rollback() raise finally: session.close() # ─── Novel CRUD ───────────────────────────────────────────────────────────────── def create_novel( title: str, original_filename: str, mc_original_name: Optional[str] = None, mc_indian_name: Optional[str] = None, selected_model: Optional[str] = None, ) -> Novel: """Create a new novel record.""" with get_db_session() as session: novel = Novel( title=title, original_filename=original_filename, mc_original_name=mc_original_name, mc_indian_name=mc_indian_name, selected_model=selected_model, ) session.add(novel) session.flush() # Ensure it gets an ID assigned session.refresh(novel) # Populate any defaults return novel def get_novel(novel_id: int) -> Optional[Novel]: """Get a novel by ID.""" with get_db_session() as session: return session.query(Novel).get(novel_id) def get_all_novels() -> List[Dict]: """Get all novels as dicts.""" with get_db_session() as session: novels = session.query(Novel).order_by(Novel.created_at.desc()).all() result = [] for n in novels: result.append({ "id": n.id, "title": n.title, "original_filename": n.original_filename, "mc_original_name": n.mc_original_name, "mc_indian_name": n.mc_indian_name, "total_chapters": n.total_chapters, "processed_chapters": n.processed_chapters, "status": n.status, "selected_model": n.selected_model, "created_at": str(n.created_at), }) return result def update_novel_status(novel_id: int, status: str, processed_chapters: Optional[int] = None): """Update novel processing status.""" with get_db_session() as session: novel = session.query(Novel).get(novel_id) if novel: novel.status = status novel.updated_at = datetime.utcnow() if processed_chapters is not None: novel.processed_chapters = processed_chapters def update_novel_chapter_count(novel_id: int, total_chapters: int): """Update total chapter count for a novel.""" with get_db_session() as session: novel = session.query(Novel).get(novel_id) if novel: novel.total_chapters = total_chapters novel.updated_at = datetime.utcnow() def delete_novel(novel_id: int) -> bool: """Delete a novel and all associated data (cascading).""" with get_db_session() as session: novel = session.query(Novel).get(novel_id) if novel: session.delete(novel) logger.info(f"Deleted novel ID {novel_id} and all associated data.") return True return False # ─── Chapter CRUD ──────────────────────────────────────────────────────────────── def store_chapters(novel_id: int, chapters: List[Dict]): """Store parsed chapters in database.""" with get_db_session() as session: for ch in chapters: chapter = Chapter( novel_id=novel_id, chapter_number=ch["chapter_number"], original_title=ch.get("title", ""), original_text=ch["text"], ) session.add(chapter) session.flush() def get_unprocessed_chapters(novel_id: int, limit: int = 20) -> List[Dict]: """Get next batch of unprocessed chapters.""" with get_db_session() as session: chapters = ( session.query(Chapter) .filter(Chapter.novel_id == novel_id, Chapter.is_processed == False) .order_by(Chapter.chapter_number) .limit(limit) .all() ) return [ { "id": ch.id, "chapter_number": ch.chapter_number, "original_title": ch.original_title, "original_text": ch.original_text, "retry_count": ch.retry_count, } for ch in chapters ] def save_translated_chapter(chapter_id: int, translated_text: str): """Save translated text for a chapter.""" with get_db_session() as session: chapter = session.query(Chapter).get(chapter_id) if chapter: chapter.translated_text = translated_text chapter.is_processed = True chapter.processed_at = datetime.utcnow() def mark_chapter_error(chapter_id: int, error_message: str): """Mark a chapter as having an error.""" with get_db_session() as session: chapter = session.query(Chapter).get(chapter_id) if chapter: chapter.error_message = error_message chapter.retry_count += 1 def get_translated_chapters_range( novel_id: int, start_chapter: int, end_chapter: int ) -> List[Dict]: """Get translated chapters in a range.""" with get_db_session() as session: chapters = ( session.query(Chapter) .filter( Chapter.novel_id == novel_id, Chapter.chapter_number >= start_chapter, Chapter.chapter_number <= end_chapter, Chapter.is_processed == True, ) .order_by(Chapter.chapter_number) .all() ) return [ { "chapter_number": ch.chapter_number, "original_title": ch.original_title, "translated_text": ch.translated_text, } for ch in chapters ] def get_processed_chapter_count(novel_id: int) -> int: """Get count of processed chapters.""" with get_db_session() as session: return ( session.query(Chapter) .filter(Chapter.novel_id == novel_id, Chapter.is_processed == True) .count() ) # ─── Glossary CRUD ────────────────────────────────────────────────────────────── def add_glossary_entry( novel_id: int, original_name: str, indian_name: str, entry_type: str = "character", chapter_number: int = 1, ) -> Optional[GlossaryEntry]: """Add a glossary entry, ignore duplicates.""" with get_db_session() as session: try: existing = ( session.query(GlossaryEntry) .filter( GlossaryEntry.novel_id == novel_id, GlossaryEntry.original_name == original_name.strip(), ) .first() ) if existing: existing.usage_count += 1 return existing entry = GlossaryEntry( novel_id=novel_id, original_name=original_name.strip(), indian_name=indian_name.strip(), entry_type=entry_type.strip().lower(), first_seen_chapter=chapter_number, ) session.add(entry) session.flush() logger.info( f"Added glossary: '{original_name}' -> '{indian_name}' ({entry_type})" ) return entry except IntegrityError: session.rollback() logger.warning( f"Duplicate glossary entry ignored: '{original_name}'" ) return None def add_glossary_entries_bulk( novel_id: int, mappings: List[Dict], chapter_number: int = 1 ): """Add multiple glossary entries at once.""" for mapping in mappings: add_glossary_entry( novel_id=novel_id, original_name=mapping.get("original", ""), indian_name=mapping.get("indian", ""), entry_type=mapping.get("type", "other"), chapter_number=chapter_number, ) def get_glossary(novel_id: int, limit: int = 300) -> List[Dict]: """Get glossary entries for a novel, ordered by usage count and recency.""" with get_db_session() as session: entries = ( session.query(GlossaryEntry) .filter(GlossaryEntry.novel_id == novel_id) .order_by( GlossaryEntry.usage_count.desc(), GlossaryEntry.first_seen_chapter.asc(), ) .limit(limit) .all() ) return [ { "original": e.original_name, "indian": e.indian_name, "type": e.entry_type, "usage_count": e.usage_count, } for e in entries ] def get_glossary_as_text(novel_id: int, limit: int = 300) -> str: """Get glossary formatted as text for prompt injection.""" entries = get_glossary(novel_id, limit) if not entries: return "अभी तक कोई glossary entry नहीं है।" lines = [] by_type = {} for e in entries: t = e["type"] if t not in by_type: by_type[t] = [] by_type[t].append(e) type_labels = { "character": "पात्र (Characters)", "place": "स्थान (Places)", "sect": "संप्रदाय/संगठन (Sects/Organizations)", "item": "वस्तुएं (Items)", "technique": "तकनीक/कला (Techniques)", "other": "अन्य (Others)", } for entry_type, items in by_type.items(): label = type_labels.get(entry_type, entry_type.title()) lines.append(f"\n### {label}:") for item in items: lines.append(f" - {item['original']} → {item['indian']}") return "\n".join(lines) def get_glossary_count(novel_id: int) -> int: """Get total glossary entries for a novel.""" with get_db_session() as session: return ( session.query(GlossaryEntry) .filter(GlossaryEntry.novel_id == novel_id) .count() ) # ─── Output File CRUD ─────────────────────────────────────────────────────────── def save_output_file( novel_id: int, filename: str, chapter_start: int, chapter_end: int, file_content: str, ) -> OutputFile: """Save a generated output file.""" with get_db_session() as session: output = OutputFile( novel_id=novel_id, filename=filename, chapter_start=chapter_start, chapter_end=chapter_end, file_content=file_content, ) session.add(output) session.flush() return output def get_output_files(novel_id: int) -> List[Dict]: """Get all output files for a novel.""" with get_db_session() as session: files = ( session.query(OutputFile) .filter(OutputFile.novel_id == novel_id) .order_by(OutputFile.chapter_start) .all() ) return [ { "id": f.id, "filename": f.filename, "chapter_start": f.chapter_start, "chapter_end": f.chapter_end, "created_at": str(f.created_at), } for f in files ] def get_output_file_content(file_id: int) -> Optional[str]: """Get content of a specific output file.""" with get_db_session() as session: f = session.query(OutputFile).get(file_id) if f: return f.file_content return None