Spaces:
Sleeping
Sleeping
| """ | |
| ResearchRadar — SQLite wrapper with migrations. | |
| All write operations use parameterised queries exclusively. | |
| Never format SQL strings with user or API data. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import logging | |
| import os | |
| import sqlite3 | |
| import time | |
| from datetime import date, datetime | |
| from typing import List, Optional | |
| from app.core.config import DB_VERSION | |
| from app.core.models import Digest, Paper | |
| logger = logging.getLogger(__name__) | |
| # --------------------------------------------------------------------------- | |
| # Schema DDL (Version 1) | |
| # --------------------------------------------------------------------------- | |
| _SCHEMA_V1 = """ | |
| CREATE TABLE IF NOT EXISTS meta ( | |
| key TEXT PRIMARY KEY, | |
| value TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS papers ( | |
| paper_id TEXT PRIMARY KEY, | |
| source TEXT NOT NULL, | |
| title TEXT NOT NULL, | |
| abstract TEXT NOT NULL, | |
| summary_llm TEXT, | |
| authors TEXT NOT NULL, | |
| published_date TEXT NOT NULL, | |
| categories TEXT NOT NULL, | |
| app_category TEXT NOT NULL, | |
| pdf_url TEXT, | |
| abstract_url TEXT NOT NULL, | |
| citation_count INTEGER DEFAULT 0, | |
| relevance_score REAL DEFAULT 0.0, | |
| composite_score REAL DEFAULT 0.0, | |
| fetched_at TEXT NOT NULL, | |
| is_bookmarked INTEGER DEFAULT 0, | |
| is_read INTEGER DEFAULT 0 | |
| ); | |
| CREATE TABLE IF NOT EXISTS digests ( | |
| digest_id TEXT PRIMARY KEY, | |
| week_start TEXT NOT NULL, | |
| generated_at TEXT NOT NULL, | |
| total_fetched INTEGER, | |
| total_ranked INTEGER, | |
| fetch_errors TEXT, | |
| videos_json TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS digest_papers ( | |
| digest_id TEXT NOT NULL, | |
| paper_id TEXT NOT NULL, | |
| rank_order INTEGER NOT NULL, | |
| PRIMARY KEY (digest_id, paper_id), | |
| FOREIGN KEY (digest_id) REFERENCES digests(digest_id), | |
| FOREIGN KEY (paper_id) REFERENCES papers(paper_id) | |
| ); | |
| """ | |
| # --------------------------------------------------------------------------- | |
| # Connection | |
| # --------------------------------------------------------------------------- | |
| _DB_RETRY_MAX = 3 | |
| _DB_RETRY_SLEEP = 0.5 | |
| def get_connection(db_path: str) -> sqlite3.Connection: | |
| """Return a connection with row_factory and WAL mode enabled.""" | |
| conn = sqlite3.connect(db_path) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute('PRAGMA journal_mode=WAL') | |
| conn.execute('PRAGMA foreign_keys=ON') | |
| return conn | |
| def _retry_on_locked(func): | |
| """Decorator: retry up to _DB_RETRY_MAX times on 'database is locked'.""" | |
| def wrapper(*args, **kwargs): | |
| for attempt in range(_DB_RETRY_MAX): | |
| try: | |
| return func(*args, **kwargs) | |
| except sqlite3.OperationalError as exc: | |
| if 'database is locked' in str(exc) and attempt < _DB_RETRY_MAX - 1: | |
| logger.warning('DB locked — retrying (%d/%d)', attempt + 1, _DB_RETRY_MAX) | |
| time.sleep(_DB_RETRY_SLEEP) | |
| else: | |
| raise | |
| return wrapper | |
| # --------------------------------------------------------------------------- | |
| # Initialisation & Migrations | |
| # --------------------------------------------------------------------------- | |
| def initialize(db_path: str) -> None: | |
| """Create tables and run any pending migrations.""" | |
| conn = get_connection(db_path) | |
| try: | |
| conn.executescript(_SCHEMA_V1) | |
| # Set version if not present | |
| row = conn.execute( | |
| "SELECT value FROM meta WHERE key = 'db_version'" | |
| ).fetchone() | |
| if row is None: | |
| conn.execute( | |
| "INSERT INTO meta (key, value) VALUES ('db_version', ?)", | |
| (str(DB_VERSION),), | |
| ) | |
| else: | |
| current = int(row['value']) | |
| if current < DB_VERSION: | |
| run_migrations(conn, current, DB_VERSION) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def run_migrations(conn: sqlite3.Connection, current: int, target: int) -> None: | |
| """Apply sequential migrations from *current* to *target* version.""" | |
| logger.info('Migrating DB from v%d to v%d', current, target) | |
| if current < 2: | |
| try: | |
| conn.execute("ALTER TABLE papers ADD COLUMN summary_llm TEXT") | |
| logger.info('V2 Migration: Added summary_llm column to papers table.') | |
| except sqlite3.OperationalError as e: | |
| if 'duplicate column name' in str(e).lower(): | |
| pass # Already exists | |
| else: | |
| raise | |
| if current < 3: | |
| try: | |
| conn.execute("ALTER TABLE digests ADD COLUMN videos_json TEXT") | |
| logger.info('V3 Migration: Added videos_json column to digests table.') | |
| except sqlite3.OperationalError as e: | |
| if 'duplicate column name' in str(e).lower(): | |
| pass # Already exists | |
| else: | |
| raise | |
| conn.execute( | |
| "UPDATE meta SET value = ? WHERE key = 'db_version'", | |
| (str(target),), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # Paper helpers | |
| # --------------------------------------------------------------------------- | |
| def _paper_to_row(paper: Paper) -> tuple: | |
| return ( | |
| paper.paper_id, | |
| paper.source, | |
| paper.title, | |
| paper.abstract, | |
| paper.summary_llm, | |
| json.dumps(paper.authors), | |
| paper.published_date.isoformat(), | |
| json.dumps(paper.categories), | |
| paper.app_category, | |
| paper.pdf_url, | |
| paper.abstract_url, | |
| paper.citation_count, | |
| paper.relevance_score, | |
| paper.composite_score, | |
| paper.fetched_at.isoformat(), | |
| int(paper.is_bookmarked), | |
| int(paper.is_read), | |
| ) | |
| def _row_to_paper(row: sqlite3.Row) -> Paper: | |
| return Paper( | |
| paper_id=row['paper_id'], | |
| source=row['source'], | |
| title=row['title'], | |
| abstract=row['abstract'], | |
| summary_llm=row['summary_llm'], | |
| authors=json.loads(row['authors']), | |
| published_date=date.fromisoformat(row['published_date']), | |
| categories=json.loads(row['categories']), | |
| app_category=row['app_category'], | |
| pdf_url=row['pdf_url'], | |
| abstract_url=row['abstract_url'], | |
| citation_count=row['citation_count'], | |
| relevance_score=row['relevance_score'], | |
| composite_score=row['composite_score'], | |
| fetched_at=datetime.fromisoformat(row['fetched_at']), | |
| is_bookmarked=bool(row['is_bookmarked']), | |
| is_read=bool(row['is_read']), | |
| ) | |
| # --------------------------------------------------------------------------- | |
| # CRUD Operations | |
| # --------------------------------------------------------------------------- | |
| def save_digest(db_path: str, digest: Digest) -> None: | |
| """Transactional insert of a digest + all its papers.""" | |
| conn = get_connection(db_path) | |
| try: | |
| conn.execute('BEGIN') | |
| # Insert digest record | |
| conn.execute( | |
| """INSERT OR REPLACE INTO digests | |
| (digest_id, week_start, generated_at, total_fetched, | |
| total_ranked, fetch_errors, videos_json) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| ( | |
| digest.digest_id, | |
| digest.week_start.isoformat(), | |
| digest.generated_at.isoformat(), | |
| digest.total_fetched, | |
| digest.total_ranked, | |
| json.dumps(digest.fetch_errors), | |
| json.dumps(digest.videos), | |
| ), | |
| ) | |
| # Insert papers and link to digest | |
| rank = 0 | |
| for category, papers in digest.papers.items(): | |
| for paper in papers: | |
| conn.execute( | |
| """INSERT OR REPLACE INTO papers | |
| (paper_id, source, title, abstract, summary_llm, authors, | |
| published_date, categories, app_category, pdf_url, | |
| abstract_url, citation_count, relevance_score, | |
| composite_score, fetched_at, is_bookmarked, is_read) | |
| VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)""", | |
| _paper_to_row(paper), | |
| ) | |
| rank += 1 | |
| conn.execute( | |
| """INSERT OR REPLACE INTO digest_papers | |
| (digest_id, paper_id, rank_order) VALUES (?, ?, ?)""", | |
| (digest.digest_id, paper.paper_id, rank), | |
| ) | |
| conn.commit() | |
| logger.info('Saved digest %s with %d papers', digest.digest_id, rank) | |
| except Exception: | |
| conn.rollback() | |
| logger.exception('Failed to save digest — rolled back') | |
| raise | |
| finally: | |
| conn.close() | |
| def get_latest_digest(db_path: str) -> Optional[Digest]: | |
| """Load the most recent digest.""" | |
| conn = get_connection(db_path) | |
| try: | |
| row = conn.execute( | |
| 'SELECT * FROM digests ORDER BY generated_at DESC LIMIT 1' | |
| ).fetchone() | |
| if row is None: | |
| return None | |
| digest = Digest( | |
| digest_id=row['digest_id'], | |
| week_start=date.fromisoformat(row['week_start']), | |
| generated_at=datetime.fromisoformat(row['generated_at']), | |
| total_fetched=row['total_fetched'], | |
| total_ranked=row['total_ranked'], | |
| fetch_errors=json.loads(row['fetch_errors'] or '[]'), | |
| videos=json.loads(row.get('videos_json', '[]')), | |
| ) | |
| # Load papers linked to this digest | |
| paper_rows = conn.execute( | |
| """SELECT p.* FROM papers p | |
| INNER JOIN digest_papers dp ON p.paper_id = dp.paper_id | |
| WHERE dp.digest_id = ? | |
| ORDER BY dp.rank_order""", | |
| (digest.digest_id,), | |
| ).fetchall() | |
| papers_by_cat: dict = {} | |
| for pr in paper_rows: | |
| paper = _row_to_paper(pr) | |
| papers_by_cat.setdefault(paper.app_category, []).append(paper) | |
| digest.papers = papers_by_cat | |
| return digest | |
| finally: | |
| conn.close() | |
| def get_papers(db_path: str, category: str, limit: int = 10) -> List[Paper]: | |
| """Get papers for a category, ordered by composite score.""" | |
| conn = get_connection(db_path) | |
| try: | |
| rows = conn.execute( | |
| """SELECT * FROM papers | |
| WHERE app_category = ? | |
| ORDER BY composite_score DESC | |
| LIMIT ?""", | |
| (category, limit), | |
| ).fetchall() | |
| return [_row_to_paper(r) for r in rows] | |
| finally: | |
| conn.close() | |
| def toggle_bookmark(db_path: str, paper_id: str) -> bool: | |
| """Toggle bookmark state; returns the new state.""" | |
| conn = get_connection(db_path) | |
| try: | |
| conn.execute( | |
| """UPDATE papers | |
| SET is_bookmarked = CASE WHEN is_bookmarked = 0 THEN 1 ELSE 0 END | |
| WHERE paper_id = ?""", | |
| (paper_id,), | |
| ) | |
| conn.commit() | |
| row = conn.execute( | |
| 'SELECT is_bookmarked FROM papers WHERE paper_id = ?', | |
| (paper_id,), | |
| ).fetchone() | |
| return bool(row['is_bookmarked']) if row else False | |
| finally: | |
| conn.close() | |
| def mark_read(db_path: str, paper_id: str) -> None: | |
| """Mark a paper as read.""" | |
| conn = get_connection(db_path) | |
| try: | |
| conn.execute( | |
| 'UPDATE papers SET is_read = 1 WHERE paper_id = ?', | |
| (paper_id,), | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def get_bookmarked_papers(db_path: str) -> List[Paper]: | |
| """Return all bookmarked papers ordered by composite score.""" | |
| conn = get_connection(db_path) | |
| try: | |
| rows = conn.execute( | |
| """SELECT * FROM papers | |
| WHERE is_bookmarked = 1 | |
| ORDER BY composite_score DESC""" | |
| ).fetchall() | |
| return [_row_to_paper(r) for r in rows] | |
| finally: | |
| conn.close() | |