| """ |
| database.py - PostgreSQL database models and CRUD operations. |
| Handles glossary storage, novel tracking, and chapter management. |
| """ |
|
|
| import json |
| import logging |
| from datetime import datetime |
| from typing import List, Dict, Optional, Tuple |
| from contextlib import contextmanager |
|
|
| from sqlalchemy import ( |
| create_engine, Column, Integer, String, Text, DateTime, |
| Boolean, ForeignKey, UniqueConstraint, Index, Enum as SAEnum, |
| event, text as sa_text |
| ) |
| from sqlalchemy.orm import ( |
| declarative_base, sessionmaker, relationship, Session |
| ) |
| from sqlalchemy.pool import QueuePool |
| from sqlalchemy.exc import IntegrityError, OperationalError |
|
|
| from config import DATABASE_URL |
|
|
| logger = logging.getLogger(__name__) |
|
|
| |
| engine = create_engine( |
| DATABASE_URL, |
| poolclass=QueuePool, |
| pool_size=5, |
| max_overflow=10, |
| pool_timeout=30, |
| pool_recycle=1800, |
| pool_pre_ping=True, |
| echo=False |
| ) |
|
|
| |
| SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False) |
| Base = declarative_base() |
|
|
|
|
| |
|
|
| class Novel(Base): |
| """Tracks each uploaded novel.""" |
| __tablename__ = "novels" |
|
|
| id = Column(Integer, primary_key=True, autoincrement=True) |
| title = Column(String(500), nullable=False) |
| original_filename = Column(String(500), nullable=False) |
| mc_original_name = Column(String(200), nullable=True) |
| mc_indian_name = Column(String(200), nullable=True) |
| total_chapters = Column(Integer, default=0) |
| processed_chapters = Column(Integer, default=0) |
| status = Column( |
| String(50), default="uploaded" |
| ) |
| selected_model = Column(String(200), nullable=True) |
| created_at = Column(DateTime, default=datetime.utcnow) |
| updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow) |
|
|
| |
| glossary_entries = relationship( |
| "GlossaryEntry", back_populates="novel", cascade="all, delete-orphan" |
| ) |
| chapters = relationship( |
| "Chapter", back_populates="novel", cascade="all, delete-orphan" |
| ) |
| output_files = relationship( |
| "OutputFile", back_populates="novel", cascade="all, delete-orphan" |
| ) |
|
|
| def __repr__(self): |
| return f"<Novel(id={self.id}, title='{self.title}', status='{self.status}')>" |
|
|
|
|
| class GlossaryEntry(Base): |
| """Stores name/term mappings for consistency.""" |
| __tablename__ = "glossary_entries" |
|
|
| id = Column(Integer, primary_key=True, autoincrement=True) |
| novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) |
| original_name = Column(String(500), nullable=False) |
| indian_name = Column(String(500), nullable=False) |
| entry_type = Column( |
| String(50), default="character" |
| ) |
| first_seen_chapter = Column(Integer, default=1) |
| usage_count = Column(Integer, default=1) |
| created_at = Column(DateTime, default=datetime.utcnow) |
|
|
| |
| novel = relationship("Novel", back_populates="glossary_entries") |
|
|
| __table_args__ = ( |
| UniqueConstraint("novel_id", "original_name", name="uq_novel_original_name"), |
| Index("idx_novel_glossary", "novel_id"), |
| Index("idx_original_name", "original_name"), |
| ) |
|
|
| def __repr__(self): |
| return ( |
| f"<GlossaryEntry(original='{self.original_name}', " |
| f"indian='{self.indian_name}', type='{self.entry_type}')>" |
| ) |
|
|
|
|
| class Chapter(Base): |
| """Tracks individual chapters and their processing status.""" |
| __tablename__ = "chapters" |
|
|
| id = Column(Integer, primary_key=True, autoincrement=True) |
| novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) |
| chapter_number = Column(Integer, nullable=False) |
| original_title = Column(String(1000), nullable=True) |
| original_text = Column(Text, nullable=False) |
| translated_text = Column(Text, nullable=True) |
| is_processed = Column(Boolean, default=False) |
| error_message = Column(Text, nullable=True) |
| retry_count = Column(Integer, default=0) |
| created_at = Column(DateTime, default=datetime.utcnow) |
| processed_at = Column(DateTime, nullable=True) |
|
|
| |
| novel = relationship("Novel", back_populates="chapters") |
|
|
| __table_args__ = ( |
| UniqueConstraint("novel_id", "chapter_number", name="uq_novel_chapter"), |
| Index("idx_novel_chapter", "novel_id", "chapter_number"), |
| ) |
|
|
| def __repr__(self): |
| return ( |
| f"<Chapter(novel_id={self.novel_id}, " |
| f"chapter={self.chapter_number}, processed={self.is_processed})>" |
| ) |
|
|
|
|
| class OutputFile(Base): |
| """Tracks generated output files (every 20 chapters).""" |
| __tablename__ = "output_files" |
|
|
| id = Column(Integer, primary_key=True, autoincrement=True) |
| novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False) |
| filename = Column(String(500), nullable=False) |
| chapter_start = Column(Integer, nullable=False) |
| chapter_end = Column(Integer, nullable=False) |
| file_content = Column(Text, nullable=False) |
| created_at = Column(DateTime, default=datetime.utcnow) |
|
|
| |
| novel = relationship("Novel", back_populates="output_files") |
|
|
| __table_args__ = ( |
| Index("idx_novel_output", "novel_id"), |
| ) |
|
|
|
|
| |
|
|
| def init_database(): |
| """Create all tables if they don't exist.""" |
| try: |
| Base.metadata.create_all(bind=engine) |
| logger.info("Database tables created/verified successfully.") |
| except Exception as e: |
| logger.error(f"Failed to initialize database: {e}") |
| raise |
|
|
|
|
| @contextmanager |
| def get_db_session() -> Session: |
| """Context manager for database sessions with proper cleanup.""" |
| session = SessionLocal() |
| try: |
| yield session |
| session.commit() |
| except Exception: |
| session.rollback() |
| raise |
| finally: |
| session.close() |
|
|
|
|
| |
|
|
| def create_novel( |
| title: str, |
| original_filename: str, |
| mc_original_name: Optional[str] = None, |
| mc_indian_name: Optional[str] = None, |
| selected_model: Optional[str] = None, |
| ) -> Novel: |
| """Create a new novel record.""" |
| with get_db_session() as session: |
| novel = Novel( |
| title=title, |
| original_filename=original_filename, |
| mc_original_name=mc_original_name, |
| mc_indian_name=mc_indian_name, |
| selected_model=selected_model, |
| ) |
| session.add(novel) |
| session.flush() |
| session.refresh(novel) |
| return novel |
|
|
|
|
| def get_novel(novel_id: int) -> Optional[Novel]: |
| """Get a novel by ID.""" |
| with get_db_session() as session: |
| return session.query(Novel).get(novel_id) |
|
|
|
|
| def get_all_novels() -> List[Dict]: |
| """Get all novels as dicts.""" |
| with get_db_session() as session: |
| novels = session.query(Novel).order_by(Novel.created_at.desc()).all() |
| result = [] |
| for n in novels: |
| result.append({ |
| "id": n.id, |
| "title": n.title, |
| "original_filename": n.original_filename, |
| "mc_original_name": n.mc_original_name, |
| "mc_indian_name": n.mc_indian_name, |
| "total_chapters": n.total_chapters, |
| "processed_chapters": n.processed_chapters, |
| "status": n.status, |
| "selected_model": n.selected_model, |
| "created_at": str(n.created_at), |
| }) |
| return result |
|
|
|
|
| def update_novel_status(novel_id: int, status: str, processed_chapters: Optional[int] = None): |
| """Update novel processing status.""" |
| with get_db_session() as session: |
| novel = session.query(Novel).get(novel_id) |
| if novel: |
| novel.status = status |
| novel.updated_at = datetime.utcnow() |
| if processed_chapters is not None: |
| novel.processed_chapters = processed_chapters |
|
|
|
|
| def update_novel_chapter_count(novel_id: int, total_chapters: int): |
| """Update total chapter count for a novel.""" |
| with get_db_session() as session: |
| novel = session.query(Novel).get(novel_id) |
| if novel: |
| novel.total_chapters = total_chapters |
| novel.updated_at = datetime.utcnow() |
|
|
|
|
| def delete_novel(novel_id: int) -> bool: |
| """Delete a novel and all associated data (cascading).""" |
| with get_db_session() as session: |
| novel = session.query(Novel).get(novel_id) |
| if novel: |
| session.delete(novel) |
| logger.info(f"Deleted novel ID {novel_id} and all associated data.") |
| return True |
| return False |
|
|
|
|
| |
|
|
| def store_chapters(novel_id: int, chapters: List[Dict]): |
| """Store parsed chapters in database.""" |
| with get_db_session() as session: |
| for ch in chapters: |
| chapter = Chapter( |
| novel_id=novel_id, |
| chapter_number=ch["chapter_number"], |
| original_title=ch.get("title", ""), |
| original_text=ch["text"], |
| ) |
| session.add(chapter) |
| session.flush() |
|
|
|
|
| def get_unprocessed_chapters(novel_id: int, limit: int = 20) -> List[Dict]: |
| """Get next batch of unprocessed chapters.""" |
| with get_db_session() as session: |
| chapters = ( |
| session.query(Chapter) |
| .filter(Chapter.novel_id == novel_id, Chapter.is_processed == False) |
| .order_by(Chapter.chapter_number) |
| .limit(limit) |
| .all() |
| ) |
| return [ |
| { |
| "id": ch.id, |
| "chapter_number": ch.chapter_number, |
| "original_title": ch.original_title, |
| "original_text": ch.original_text, |
| "retry_count": ch.retry_count, |
| } |
| for ch in chapters |
| ] |
|
|
|
|
| def save_translated_chapter(chapter_id: int, translated_text: str): |
| """Save translated text for a chapter.""" |
| with get_db_session() as session: |
| chapter = session.query(Chapter).get(chapter_id) |
| if chapter: |
| chapter.translated_text = translated_text |
| chapter.is_processed = True |
| chapter.processed_at = datetime.utcnow() |
|
|
|
|
| def mark_chapter_error(chapter_id: int, error_message: str): |
| """Mark a chapter as having an error.""" |
| with get_db_session() as session: |
| chapter = session.query(Chapter).get(chapter_id) |
| if chapter: |
| chapter.error_message = error_message |
| chapter.retry_count += 1 |
|
|
|
|
| def get_translated_chapters_range( |
| novel_id: int, start_chapter: int, end_chapter: int |
| ) -> List[Dict]: |
| """Get translated chapters in a range.""" |
| with get_db_session() as session: |
| chapters = ( |
| session.query(Chapter) |
| .filter( |
| Chapter.novel_id == novel_id, |
| Chapter.chapter_number >= start_chapter, |
| Chapter.chapter_number <= end_chapter, |
| Chapter.is_processed == True, |
| ) |
| .order_by(Chapter.chapter_number) |
| .all() |
| ) |
| return [ |
| { |
| "chapter_number": ch.chapter_number, |
| "original_title": ch.original_title, |
| "translated_text": ch.translated_text, |
| } |
| for ch in chapters |
| ] |
|
|
|
|
| def get_processed_chapter_count(novel_id: int) -> int: |
| """Get count of processed chapters.""" |
| with get_db_session() as session: |
| return ( |
| session.query(Chapter) |
| .filter(Chapter.novel_id == novel_id, Chapter.is_processed == True) |
| .count() |
| ) |
|
|
|
|
| |
|
|
| def add_glossary_entry( |
| novel_id: int, |
| original_name: str, |
| indian_name: str, |
| entry_type: str = "character", |
| chapter_number: int = 1, |
| ) -> Optional[GlossaryEntry]: |
| """Add a glossary entry, ignore duplicates.""" |
| with get_db_session() as session: |
| try: |
| existing = ( |
| session.query(GlossaryEntry) |
| .filter( |
| GlossaryEntry.novel_id == novel_id, |
| GlossaryEntry.original_name == original_name.strip(), |
| ) |
| .first() |
| ) |
| if existing: |
| existing.usage_count += 1 |
| return existing |
|
|
| entry = GlossaryEntry( |
| novel_id=novel_id, |
| original_name=original_name.strip(), |
| indian_name=indian_name.strip(), |
| entry_type=entry_type.strip().lower(), |
| first_seen_chapter=chapter_number, |
| ) |
| session.add(entry) |
| session.flush() |
| logger.info( |
| f"Added glossary: '{original_name}' -> '{indian_name}' ({entry_type})" |
| ) |
| return entry |
| except IntegrityError: |
| session.rollback() |
| logger.warning( |
| f"Duplicate glossary entry ignored: '{original_name}'" |
| ) |
| return None |
|
|
|
|
| def add_glossary_entries_bulk( |
| novel_id: int, mappings: List[Dict], chapter_number: int = 1 |
| ): |
| """Add multiple glossary entries at once.""" |
| for mapping in mappings: |
| add_glossary_entry( |
| novel_id=novel_id, |
| original_name=mapping.get("original", ""), |
| indian_name=mapping.get("indian", ""), |
| entry_type=mapping.get("type", "other"), |
| chapter_number=chapter_number, |
| ) |
|
|
|
|
| def get_glossary(novel_id: int, limit: int = 300) -> List[Dict]: |
| """Get glossary entries for a novel, ordered by usage count and recency.""" |
| with get_db_session() as session: |
| entries = ( |
| session.query(GlossaryEntry) |
| .filter(GlossaryEntry.novel_id == novel_id) |
| .order_by( |
| GlossaryEntry.usage_count.desc(), |
| GlossaryEntry.first_seen_chapter.asc(), |
| ) |
| .limit(limit) |
| .all() |
| ) |
| return [ |
| { |
| "original": e.original_name, |
| "indian": e.indian_name, |
| "type": e.entry_type, |
| "usage_count": e.usage_count, |
| } |
| for e in entries |
| ] |
|
|
|
|
| def get_glossary_as_text(novel_id: int, limit: int = 300) -> str: |
| """Get glossary formatted as text for prompt injection.""" |
| entries = get_glossary(novel_id, limit) |
| if not entries: |
| return "ΰ€
ΰ€ΰ₯ ΰ€€ΰ€ ΰ€ΰ₯ΰ€ glossary entry ΰ€¨ΰ€Ήΰ₯ΰ€ ΰ€Ήΰ₯ΰ₯€" |
|
|
| lines = [] |
| by_type = {} |
| for e in entries: |
| t = e["type"] |
| if t not in by_type: |
| by_type[t] = [] |
| by_type[t].append(e) |
|
|
| type_labels = { |
| "character": "ΰ€ͺΰ€Ύΰ€€ΰ₯ΰ€° (Characters)", |
| "place": "ΰ€Έΰ₯ΰ€₯ΰ€Ύΰ€¨ (Places)", |
| "sect": "ΰ€Έΰ€ΰ€ͺΰ₯ΰ€°ΰ€¦ΰ€Ύΰ€―/ΰ€Έΰ€ΰ€ΰ€ ΰ€¨ (Sects/Organizations)", |
| "item": "ΰ€΅ΰ€Έΰ₯ΰ€€ΰ₯ΰ€ΰ€ (Items)", |
| "technique": "ΰ€€ΰ€ΰ€¨ΰ₯ΰ€/ΰ€ΰ€²ΰ€Ύ (Techniques)", |
| "other": "ΰ€
ΰ€¨ΰ₯ΰ€― (Others)", |
| } |
|
|
| for entry_type, items in by_type.items(): |
| label = type_labels.get(entry_type, entry_type.title()) |
| lines.append(f"\n### {label}:") |
| for item in items: |
| lines.append(f" - {item['original']} β {item['indian']}") |
|
|
| return "\n".join(lines) |
|
|
|
|
| def get_glossary_count(novel_id: int) -> int: |
| """Get total glossary entries for a novel.""" |
| with get_db_session() as session: |
| return ( |
| session.query(GlossaryEntry) |
| .filter(GlossaryEntry.novel_id == novel_id) |
| .count() |
| ) |
|
|
|
|
| |
|
|
| def save_output_file( |
| novel_id: int, |
| filename: str, |
| chapter_start: int, |
| chapter_end: int, |
| file_content: str, |
| ) -> OutputFile: |
| """Save a generated output file.""" |
| with get_db_session() as session: |
| output = OutputFile( |
| novel_id=novel_id, |
| filename=filename, |
| chapter_start=chapter_start, |
| chapter_end=chapter_end, |
| file_content=file_content, |
| ) |
| session.add(output) |
| session.flush() |
| return output |
|
|
|
|
| def get_output_files(novel_id: int) -> List[Dict]: |
| """Get all output files for a novel.""" |
| with get_db_session() as session: |
| files = ( |
| session.query(OutputFile) |
| .filter(OutputFile.novel_id == novel_id) |
| .order_by(OutputFile.chapter_start) |
| .all() |
| ) |
| return [ |
| { |
| "id": f.id, |
| "filename": f.filename, |
| "chapter_start": f.chapter_start, |
| "chapter_end": f.chapter_end, |
| "created_at": str(f.created_at), |
| } |
| for f in files |
| ] |
|
|
|
|
| def get_output_file_content(file_id: int) -> Optional[str]: |
| """Get content of a specific output file.""" |
| with get_db_session() as session: |
| f = session.query(OutputFile).get(file_id) |
| if f: |
| return f.file_content |
| return None |
|
|