Novelind / database.py
Ruhivig65's picture
Update database.py
4a75092 verified
"""
database.py - PostgreSQL database models and CRUD operations.
Handles glossary storage, novel tracking, and chapter management.
"""
import json
import logging
from datetime import datetime
from typing import List, Dict, Optional, Tuple
from contextlib import contextmanager
from sqlalchemy import (
create_engine, Column, Integer, String, Text, DateTime,
Boolean, ForeignKey, UniqueConstraint, Index, Enum as SAEnum,
event, text as sa_text
)
from sqlalchemy.orm import (
declarative_base, sessionmaker, relationship, Session
)
from sqlalchemy.pool import QueuePool
from sqlalchemy.exc import IntegrityError, OperationalError
from config import DATABASE_URL
logger = logging.getLogger(__name__)
# ─── Engine & Session Setup ─────────────────────────────────────────────────────
engine = create_engine(
DATABASE_URL,
poolclass=QueuePool,
pool_size=5,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
echo=False
)
# Added expire_on_commit=False to prevent DetachedInstanceError when accessing object attributes outside the session
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False, expire_on_commit=False)
Base = declarative_base()
# ─── Models ─────────────────────────────────────────────────────────────────────
class Novel(Base):
"""Tracks each uploaded novel."""
__tablename__ = "novels"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(500), nullable=False)
original_filename = Column(String(500), nullable=False)
mc_original_name = Column(String(200), nullable=True)
mc_indian_name = Column(String(200), nullable=True)
total_chapters = Column(Integer, default=0)
processed_chapters = Column(Integer, default=0)
status = Column(
String(50), default="uploaded"
) # uploaded, processing, completed, error
selected_model = Column(String(200), nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# Relationships
glossary_entries = relationship(
"GlossaryEntry", back_populates="novel", cascade="all, delete-orphan"
)
chapters = relationship(
"Chapter", back_populates="novel", cascade="all, delete-orphan"
)
output_files = relationship(
"OutputFile", back_populates="novel", cascade="all, delete-orphan"
)
def __repr__(self):
return f"<Novel(id={self.id}, title='{self.title}', status='{self.status}')>"
class GlossaryEntry(Base):
"""Stores name/term mappings for consistency."""
__tablename__ = "glossary_entries"
id = Column(Integer, primary_key=True, autoincrement=True)
novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False)
original_name = Column(String(500), nullable=False)
indian_name = Column(String(500), nullable=False)
entry_type = Column(
String(50), default="character"
) # character, place, sect, item, technique, other
first_seen_chapter = Column(Integer, default=1)
usage_count = Column(Integer, default=1)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
novel = relationship("Novel", back_populates="glossary_entries")
__table_args__ = (
UniqueConstraint("novel_id", "original_name", name="uq_novel_original_name"),
Index("idx_novel_glossary", "novel_id"),
Index("idx_original_name", "original_name"),
)
def __repr__(self):
return (
f"<GlossaryEntry(original='{self.original_name}', "
f"indian='{self.indian_name}', type='{self.entry_type}')>"
)
class Chapter(Base):
"""Tracks individual chapters and their processing status."""
__tablename__ = "chapters"
id = Column(Integer, primary_key=True, autoincrement=True)
novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False)
chapter_number = Column(Integer, nullable=False)
original_title = Column(String(1000), nullable=True)
original_text = Column(Text, nullable=False)
translated_text = Column(Text, nullable=True)
is_processed = Column(Boolean, default=False)
error_message = Column(Text, nullable=True)
retry_count = Column(Integer, default=0)
created_at = Column(DateTime, default=datetime.utcnow)
processed_at = Column(DateTime, nullable=True)
# Relationships
novel = relationship("Novel", back_populates="chapters")
__table_args__ = (
UniqueConstraint("novel_id", "chapter_number", name="uq_novel_chapter"),
Index("idx_novel_chapter", "novel_id", "chapter_number"),
)
def __repr__(self):
return (
f"<Chapter(novel_id={self.novel_id}, "
f"chapter={self.chapter_number}, processed={self.is_processed})>"
)
class OutputFile(Base):
"""Tracks generated output files (every 20 chapters)."""
__tablename__ = "output_files"
id = Column(Integer, primary_key=True, autoincrement=True)
novel_id = Column(Integer, ForeignKey("novels.id", ondelete="CASCADE"), nullable=False)
filename = Column(String(500), nullable=False)
chapter_start = Column(Integer, nullable=False)
chapter_end = Column(Integer, nullable=False)
file_content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.utcnow)
# Relationships
novel = relationship("Novel", back_populates="output_files")
__table_args__ = (
Index("idx_novel_output", "novel_id"),
)
# ─── Database Initialization ────────────────────────────────────────────────────
def init_database():
"""Create all tables if they don't exist."""
try:
Base.metadata.create_all(bind=engine)
logger.info("Database tables created/verified successfully.")
except Exception as e:
logger.error(f"Failed to initialize database: {e}")
raise
@contextmanager
def get_db_session() -> Session:
"""Context manager for database sessions with proper cleanup."""
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# ─── Novel CRUD ─────────────────────────────────────────────────────────────────
def create_novel(
title: str,
original_filename: str,
mc_original_name: Optional[str] = None,
mc_indian_name: Optional[str] = None,
selected_model: Optional[str] = None,
) -> Novel:
"""Create a new novel record."""
with get_db_session() as session:
novel = Novel(
title=title,
original_filename=original_filename,
mc_original_name=mc_original_name,
mc_indian_name=mc_indian_name,
selected_model=selected_model,
)
session.add(novel)
session.flush() # Ensure it gets an ID assigned
session.refresh(novel) # Populate any defaults
return novel
def get_novel(novel_id: int) -> Optional[Novel]:
"""Get a novel by ID."""
with get_db_session() as session:
return session.query(Novel).get(novel_id)
def get_all_novels() -> List[Dict]:
"""Get all novels as dicts."""
with get_db_session() as session:
novels = session.query(Novel).order_by(Novel.created_at.desc()).all()
result = []
for n in novels:
result.append({
"id": n.id,
"title": n.title,
"original_filename": n.original_filename,
"mc_original_name": n.mc_original_name,
"mc_indian_name": n.mc_indian_name,
"total_chapters": n.total_chapters,
"processed_chapters": n.processed_chapters,
"status": n.status,
"selected_model": n.selected_model,
"created_at": str(n.created_at),
})
return result
def update_novel_status(novel_id: int, status: str, processed_chapters: Optional[int] = None):
"""Update novel processing status."""
with get_db_session() as session:
novel = session.query(Novel).get(novel_id)
if novel:
novel.status = status
novel.updated_at = datetime.utcnow()
if processed_chapters is not None:
novel.processed_chapters = processed_chapters
def update_novel_chapter_count(novel_id: int, total_chapters: int):
"""Update total chapter count for a novel."""
with get_db_session() as session:
novel = session.query(Novel).get(novel_id)
if novel:
novel.total_chapters = total_chapters
novel.updated_at = datetime.utcnow()
def delete_novel(novel_id: int) -> bool:
"""Delete a novel and all associated data (cascading)."""
with get_db_session() as session:
novel = session.query(Novel).get(novel_id)
if novel:
session.delete(novel)
logger.info(f"Deleted novel ID {novel_id} and all associated data.")
return True
return False
# ─── Chapter CRUD ────────────────────────────────────────────────────────────────
def store_chapters(novel_id: int, chapters: List[Dict]):
"""Store parsed chapters in database."""
with get_db_session() as session:
for ch in chapters:
chapter = Chapter(
novel_id=novel_id,
chapter_number=ch["chapter_number"],
original_title=ch.get("title", ""),
original_text=ch["text"],
)
session.add(chapter)
session.flush()
def get_unprocessed_chapters(novel_id: int, limit: int = 20) -> List[Dict]:
"""Get next batch of unprocessed chapters."""
with get_db_session() as session:
chapters = (
session.query(Chapter)
.filter(Chapter.novel_id == novel_id, Chapter.is_processed == False)
.order_by(Chapter.chapter_number)
.limit(limit)
.all()
)
return [
{
"id": ch.id,
"chapter_number": ch.chapter_number,
"original_title": ch.original_title,
"original_text": ch.original_text,
"retry_count": ch.retry_count,
}
for ch in chapters
]
def save_translated_chapter(chapter_id: int, translated_text: str):
"""Save translated text for a chapter."""
with get_db_session() as session:
chapter = session.query(Chapter).get(chapter_id)
if chapter:
chapter.translated_text = translated_text
chapter.is_processed = True
chapter.processed_at = datetime.utcnow()
def mark_chapter_error(chapter_id: int, error_message: str):
"""Mark a chapter as having an error."""
with get_db_session() as session:
chapter = session.query(Chapter).get(chapter_id)
if chapter:
chapter.error_message = error_message
chapter.retry_count += 1
def get_translated_chapters_range(
novel_id: int, start_chapter: int, end_chapter: int
) -> List[Dict]:
"""Get translated chapters in a range."""
with get_db_session() as session:
chapters = (
session.query(Chapter)
.filter(
Chapter.novel_id == novel_id,
Chapter.chapter_number >= start_chapter,
Chapter.chapter_number <= end_chapter,
Chapter.is_processed == True,
)
.order_by(Chapter.chapter_number)
.all()
)
return [
{
"chapter_number": ch.chapter_number,
"original_title": ch.original_title,
"translated_text": ch.translated_text,
}
for ch in chapters
]
def get_processed_chapter_count(novel_id: int) -> int:
"""Get count of processed chapters."""
with get_db_session() as session:
return (
session.query(Chapter)
.filter(Chapter.novel_id == novel_id, Chapter.is_processed == True)
.count()
)
# ─── Glossary CRUD ──────────────────────────────────────────────────────────────
def add_glossary_entry(
novel_id: int,
original_name: str,
indian_name: str,
entry_type: str = "character",
chapter_number: int = 1,
) -> Optional[GlossaryEntry]:
"""Add a glossary entry, ignore duplicates."""
with get_db_session() as session:
try:
existing = (
session.query(GlossaryEntry)
.filter(
GlossaryEntry.novel_id == novel_id,
GlossaryEntry.original_name == original_name.strip(),
)
.first()
)
if existing:
existing.usage_count += 1
return existing
entry = GlossaryEntry(
novel_id=novel_id,
original_name=original_name.strip(),
indian_name=indian_name.strip(),
entry_type=entry_type.strip().lower(),
first_seen_chapter=chapter_number,
)
session.add(entry)
session.flush()
logger.info(
f"Added glossary: '{original_name}' -> '{indian_name}' ({entry_type})"
)
return entry
except IntegrityError:
session.rollback()
logger.warning(
f"Duplicate glossary entry ignored: '{original_name}'"
)
return None
def add_glossary_entries_bulk(
novel_id: int, mappings: List[Dict], chapter_number: int = 1
):
"""Add multiple glossary entries at once."""
for mapping in mappings:
add_glossary_entry(
novel_id=novel_id,
original_name=mapping.get("original", ""),
indian_name=mapping.get("indian", ""),
entry_type=mapping.get("type", "other"),
chapter_number=chapter_number,
)
def get_glossary(novel_id: int, limit: int = 300) -> List[Dict]:
"""Get glossary entries for a novel, ordered by usage count and recency."""
with get_db_session() as session:
entries = (
session.query(GlossaryEntry)
.filter(GlossaryEntry.novel_id == novel_id)
.order_by(
GlossaryEntry.usage_count.desc(),
GlossaryEntry.first_seen_chapter.asc(),
)
.limit(limit)
.all()
)
return [
{
"original": e.original_name,
"indian": e.indian_name,
"type": e.entry_type,
"usage_count": e.usage_count,
}
for e in entries
]
def get_glossary_as_text(novel_id: int, limit: int = 300) -> str:
"""Get glossary formatted as text for prompt injection."""
entries = get_glossary(novel_id, limit)
if not entries:
return "ΰ€…ΰ€­ΰ₯€ ΰ€€ΰ€• ΰ€•ΰ₯‹ΰ€ˆ glossary entry ΰ€¨ΰ€Ήΰ₯€ΰ€‚ ΰ€Ήΰ₯ˆΰ₯€"
lines = []
by_type = {}
for e in entries:
t = e["type"]
if t not in by_type:
by_type[t] = []
by_type[t].append(e)
type_labels = {
"character": "ΰ€ͺΰ€Ύΰ€€ΰ₯ΰ€° (Characters)",
"place": "ΰ€Έΰ₯ΰ€₯ΰ€Ύΰ€¨ (Places)",
"sect": "ΰ€Έΰ€‚ΰ€ͺΰ₯ΰ€°ΰ€¦ΰ€Ύΰ€―/ΰ€Έΰ€‚ΰ€—ΰ€ ΰ€¨ (Sects/Organizations)",
"item": "ΰ€΅ΰ€Έΰ₯ΰ€€ΰ₯ΰ€ΰ€‚ (Items)",
"technique": "ΰ€€ΰ€•ΰ€¨ΰ₯€ΰ€•/ΰ€•ΰ€²ΰ€Ύ (Techniques)",
"other": "ΰ€…ΰ€¨ΰ₯ΰ€― (Others)",
}
for entry_type, items in by_type.items():
label = type_labels.get(entry_type, entry_type.title())
lines.append(f"\n### {label}:")
for item in items:
lines.append(f" - {item['original']} β†’ {item['indian']}")
return "\n".join(lines)
def get_glossary_count(novel_id: int) -> int:
"""Get total glossary entries for a novel."""
with get_db_session() as session:
return (
session.query(GlossaryEntry)
.filter(GlossaryEntry.novel_id == novel_id)
.count()
)
# ─── Output File CRUD ───────────────────────────────────────────────────────────
def save_output_file(
novel_id: int,
filename: str,
chapter_start: int,
chapter_end: int,
file_content: str,
) -> OutputFile:
"""Save a generated output file."""
with get_db_session() as session:
output = OutputFile(
novel_id=novel_id,
filename=filename,
chapter_start=chapter_start,
chapter_end=chapter_end,
file_content=file_content,
)
session.add(output)
session.flush()
return output
def get_output_files(novel_id: int) -> List[Dict]:
"""Get all output files for a novel."""
with get_db_session() as session:
files = (
session.query(OutputFile)
.filter(OutputFile.novel_id == novel_id)
.order_by(OutputFile.chapter_start)
.all()
)
return [
{
"id": f.id,
"filename": f.filename,
"chapter_start": f.chapter_start,
"chapter_end": f.chapter_end,
"created_at": str(f.created_at),
}
for f in files
]
def get_output_file_content(file_id: int) -> Optional[str]:
"""Get content of a specific output file."""
with get_db_session() as session:
f = session.query(OutputFile).get(file_id)
if f:
return f.file_content
return None