Spaces:
Running
Running
| """SQLite storage layer β hand-written SQL, no ORM. | |
| Handles schema creation, paper/author/chunk ingestion, and analytical queries. | |
| All queries are parameterized (no f-strings for SQL). | |
| """ | |
| import logging | |
| import sqlite3 | |
| from pathlib import Path | |
| from src.ingestion.base_loader import PaperRecord | |
| logger = logging.getLogger(__name__) | |
| SCHEMA_SQL = """ | |
| CREATE TABLE IF NOT EXISTS papers ( | |
| id TEXT PRIMARY KEY, -- source::source_id (e.g., "hf_acl_ocl::P18-1001") | |
| source_id TEXT NOT NULL, -- original ID from source | |
| source TEXT NOT NULL, -- "hf_acl_ocl", "acl_anthology", "arxiv" | |
| title TEXT NOT NULL, | |
| abstract TEXT, | |
| full_text TEXT, -- NULL for abstract-only papers | |
| year INTEGER, | |
| venue TEXT, -- "acl", "emnlp", "naacl", etc. | |
| volume TEXT, -- "long", "short", "findings", etc. | |
| url TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ); | |
| CREATE TABLE IF NOT EXISTS authors ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| name TEXT NOT NULL, | |
| position INTEGER NOT NULL -- author order (0-indexed) | |
| ); | |
| CREATE TABLE IF NOT EXISTS chunks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| chunk_text TEXT NOT NULL, | |
| chunk_type TEXT, -- "abstract", "introduction", "method", "full_text" | |
| chunk_index INTEGER, -- order within paper | |
| token_count INTEGER | |
| ); | |
| CREATE TABLE IF NOT EXISTS methods ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| method_name TEXT NOT NULL, | |
| method_type TEXT -- "model", "technique", "framework" | |
| ); | |
| CREATE TABLE IF NOT EXISTS datasets ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| dataset_name TEXT NOT NULL, | |
| task_type TEXT -- "classification", "QA", "generation", etc. | |
| ); | |
| CREATE TABLE IF NOT EXISTS tasks ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| task_name TEXT NOT NULL | |
| ); | |
| CREATE TABLE IF NOT EXISTS topics ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| paper_id TEXT NOT NULL REFERENCES papers(id), | |
| topic_name TEXT NOT NULL | |
| ); | |
| -- Indexes for common queries | |
| CREATE INDEX IF NOT EXISTS idx_papers_year ON papers(year); | |
| CREATE INDEX IF NOT EXISTS idx_papers_venue ON papers(venue); | |
| CREATE INDEX IF NOT EXISTS idx_papers_source ON papers(source); | |
| CREATE INDEX IF NOT EXISTS idx_authors_paper ON authors(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_chunks_paper ON chunks(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_methods_name ON methods(method_name); | |
| CREATE INDEX IF NOT EXISTS idx_methods_paper ON methods(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_datasets_name ON datasets(dataset_name); | |
| CREATE INDEX IF NOT EXISTS idx_datasets_paper ON datasets(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_tasks_paper ON tasks(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_topics_paper ON topics(paper_id); | |
| CREATE INDEX IF NOT EXISTS idx_topics_name ON topics(topic_name); | |
| """ | |
| class SQLiteDB: | |
| """SQLite database interface for ResearchRadar.""" | |
| def __init__(self, db_path: str | Path): | |
| self.db_path = Path(db_path) | |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) | |
| def get_connection(self) -> sqlite3.Connection: | |
| """Get a new database connection with row factory enabled.""" | |
| conn = sqlite3.connect(str(self.db_path)) | |
| conn.row_factory = sqlite3.Row | |
| conn.execute("PRAGMA journal_mode=WAL") | |
| conn.execute("PRAGMA foreign_keys=ON") | |
| return conn | |
| def create_schema(self): | |
| """Create all tables and indexes.""" | |
| conn = self.get_connection() | |
| try: | |
| conn.executescript(SCHEMA_SQL) | |
| conn.commit() | |
| logger.info("Database schema created at %s", self.db_path) | |
| finally: | |
| conn.close() | |
| # ββ Paper ingestion ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def insert_papers(self, papers: list[PaperRecord], batch_size: int = 500): | |
| """Insert papers and their authors into the database. | |
| Uses INSERT OR IGNORE to skip duplicates (by paper_id). | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| inserted = 0 | |
| for i in range(0, len(papers), batch_size): | |
| batch = papers[i : i + batch_size] | |
| paper_rows = [ | |
| ( | |
| p.paper_id(), | |
| p.source_id, | |
| p.source, | |
| p.title, | |
| p.abstract, | |
| p.full_text, | |
| p.year, | |
| p.venue, | |
| p.volume, | |
| p.url, | |
| ) | |
| for p in batch | |
| ] | |
| conn.executemany( | |
| """INSERT OR IGNORE INTO papers | |
| (id, source_id, source, title, abstract, full_text, | |
| year, venue, volume, url) | |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", | |
| paper_rows, | |
| ) | |
| author_rows = [] | |
| for p in batch: | |
| for pos, name in enumerate(p.authors): | |
| author_rows.append((p.paper_id(), name, pos)) | |
| if author_rows: | |
| conn.executemany( | |
| """INSERT OR IGNORE INTO authors (paper_id, name, position) | |
| VALUES (?, ?, ?)""", | |
| author_rows, | |
| ) | |
| inserted += len(batch) | |
| conn.commit() | |
| logger.info("Inserted %d papers into SQLite", inserted) | |
| finally: | |
| conn.close() | |
| # ββ Chunk operations βββββββββββββββββββββββββββββββββββββββββββββ | |
| def insert_chunks( | |
| self, chunks: list[dict], batch_size: int = 1000 | |
| ): | |
| """Insert chunks into the database. | |
| Each chunk dict should have: paper_id, chunk_text, chunk_type, | |
| chunk_index, token_count. | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| for i in range(0, len(chunks), batch_size): | |
| batch = chunks[i : i + batch_size] | |
| conn.executemany( | |
| """INSERT INTO chunks | |
| (paper_id, chunk_text, chunk_type, chunk_index, token_count) | |
| VALUES (?, ?, ?, ?, ?)""", | |
| [ | |
| ( | |
| c["paper_id"], | |
| c["chunk_text"], | |
| c["chunk_type"], | |
| c["chunk_index"], | |
| c["token_count"], | |
| ) | |
| for c in batch | |
| ], | |
| ) | |
| conn.commit() | |
| logger.info("Inserted %d chunks into SQLite", len(chunks)) | |
| finally: | |
| conn.close() | |
| def get_all_chunks(self) -> list[dict]: | |
| """Retrieve all chunks with their paper metadata.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT c.id, c.paper_id, c.chunk_text, c.chunk_type, | |
| c.chunk_index, c.token_count, | |
| p.title, p.year, p.venue | |
| FROM chunks c | |
| JOIN papers p ON c.paper_id = p.id | |
| ORDER BY c.paper_id, c.chunk_index""" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def get_chunk_texts_and_ids(self) -> tuple[list[str], list[int]]: | |
| """Get all chunk texts and their IDs for BM25 index building.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| "SELECT id, chunk_text FROM chunks ORDER BY id" | |
| ).fetchall() | |
| ids = [row["id"] for row in rows] | |
| texts = [row["chunk_text"] for row in rows] | |
| return texts, ids | |
| finally: | |
| conn.close() | |
| # ββ Enrichment operations ββββββββββββββββββββββββββββββββββββββββ | |
| def insert_methods(self, paper_id: str, methods: list[dict]): | |
| """Insert extracted methods for a paper. | |
| Each method dict: {"method_name": str, "method_type": str | None} | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| conn.executemany( | |
| """INSERT INTO methods (paper_id, method_name, method_type) | |
| VALUES (?, ?, ?)""", | |
| [(paper_id, m["method_name"], m.get("method_type")) for m in methods], | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def insert_datasets(self, paper_id: str, datasets: list[dict]): | |
| """Insert extracted datasets for a paper. | |
| Each dataset dict: {"dataset_name": str, "task_type": str | None} | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| conn.executemany( | |
| """INSERT INTO datasets (paper_id, dataset_name, task_type) | |
| VALUES (?, ?, ?)""", | |
| [(paper_id, d["dataset_name"], d.get("task_type")) for d in datasets], | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def insert_tasks(self, paper_id: str, task_names: list[str]): | |
| """Insert extracted tasks for a paper.""" | |
| conn = self.get_connection() | |
| try: | |
| conn.executemany( | |
| "INSERT INTO tasks (paper_id, task_name) VALUES (?, ?)", | |
| [(paper_id, t) for t in task_names], | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| def insert_topics(self, paper_id: str, topic_names: list[str]): | |
| """Insert extracted topics for a paper.""" | |
| conn = self.get_connection() | |
| try: | |
| conn.executemany( | |
| "INSERT INTO topics (paper_id, topic_name) VALUES (?, ?)", | |
| [(paper_id, t) for t in topic_names], | |
| ) | |
| conn.commit() | |
| finally: | |
| conn.close() | |
| # ββ Query operations βββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_paper_count(self) -> int: | |
| conn = self.get_connection() | |
| try: | |
| return conn.execute("SELECT COUNT(*) FROM papers").fetchone()[0] | |
| finally: | |
| conn.close() | |
| def get_chunk_count(self) -> int: | |
| conn = self.get_connection() | |
| try: | |
| return conn.execute("SELECT COUNT(*) FROM chunks").fetchone()[0] | |
| finally: | |
| conn.close() | |
| def get_paper_by_id(self, paper_id: str) -> dict | None: | |
| """Fetch a single paper with its authors.""" | |
| conn = self.get_connection() | |
| try: | |
| paper = conn.execute( | |
| "SELECT * FROM papers WHERE id = ?", (paper_id,) | |
| ).fetchone() | |
| if paper is None: | |
| return None | |
| authors = conn.execute( | |
| "SELECT name FROM authors WHERE paper_id = ? ORDER BY position", | |
| (paper_id,), | |
| ).fetchall() | |
| methods = conn.execute( | |
| "SELECT method_name, method_type FROM methods WHERE paper_id = ?", | |
| (paper_id,), | |
| ).fetchall() | |
| datasets = conn.execute( | |
| "SELECT dataset_name, task_type FROM datasets WHERE paper_id = ?", | |
| (paper_id,), | |
| ).fetchall() | |
| result = dict(paper) | |
| result["authors"] = [row["name"] for row in authors] | |
| result["methods"] = [dict(row) for row in methods] | |
| result["datasets"] = [dict(row) for row in datasets] | |
| return result | |
| finally: | |
| conn.close() | |
| def _browse_conditions( | |
| self, | |
| venue: str | None = None, | |
| volume: str | None = None, | |
| year: int | None = None, | |
| method: str | None = None, | |
| dataset: str | None = None, | |
| author: str | None = None, | |
| ) -> tuple[str, list]: | |
| """Build WHERE clause and params for paper browsing/counting.""" | |
| conditions: list[str] = [] | |
| params: list = [] | |
| if venue: | |
| conditions.append("p.venue = ?") | |
| params.append(venue) | |
| if volume: | |
| conditions.append("p.volume = ?") | |
| params.append(volume) | |
| if year: | |
| conditions.append("p.year = ?") | |
| params.append(year) | |
| if method: | |
| conditions.append( | |
| "p.id IN (SELECT paper_id FROM methods WHERE method_name LIKE ?)" | |
| ) | |
| params.append(f"%{method}%") | |
| if dataset: | |
| conditions.append( | |
| "p.id IN (SELECT paper_id FROM datasets WHERE dataset_name LIKE ?)" | |
| ) | |
| params.append(f"%{dataset}%") | |
| if author: | |
| conditions.append( | |
| "p.id IN (SELECT paper_id FROM authors WHERE name LIKE ?)" | |
| ) | |
| params.append(f"%{author}%") | |
| where_clause = " AND ".join(conditions) if conditions else "1=1" | |
| return where_clause, params | |
| def browse_papers( | |
| self, | |
| venue: str | None = None, | |
| volume: str | None = None, | |
| year: int | None = None, | |
| method: str | None = None, | |
| dataset: str | None = None, | |
| author: str | None = None, | |
| limit: int = 20, | |
| offset: int = 0, | |
| ) -> list[dict]: | |
| """Browse/filter papers with optional filters.""" | |
| where_clause, params = self._browse_conditions( | |
| venue=venue, volume=volume, year=year, | |
| method=method, dataset=dataset, author=author, | |
| ) | |
| params.extend([limit, offset]) | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| f"""SELECT p.id, p.title, p.abstract, p.year, p.venue, p.url | |
| FROM papers p | |
| WHERE {where_clause} | |
| ORDER BY p.year DESC, p.title | |
| LIMIT ? OFFSET ?""", | |
| params, | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def count_papers( | |
| self, | |
| venue: str | None = None, | |
| volume: str | None = None, | |
| year: int | None = None, | |
| method: str | None = None, | |
| dataset: str | None = None, | |
| author: str | None = None, | |
| ) -> int: | |
| """Count papers matching the given filters.""" | |
| where_clause, params = self._browse_conditions( | |
| venue=venue, volume=volume, year=year, | |
| method=method, dataset=dataset, author=author, | |
| ) | |
| conn = self.get_connection() | |
| try: | |
| row = conn.execute( | |
| f"SELECT COUNT(*) FROM papers p WHERE {where_clause}", | |
| params, | |
| ).fetchone() | |
| return row[0] | |
| finally: | |
| conn.close() | |
| # ββ Analytical queries (trend analytics) βββββββββββββββββββββββββ | |
| def papers_per_venue(self) -> list[dict]: | |
| """Paper count per venue, aggregated across all years.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT venue, COUNT(*) as paper_count | |
| FROM papers | |
| WHERE venue IS NOT NULL | |
| GROUP BY venue | |
| ORDER BY paper_count DESC""" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def papers_per_venue_per_year(self) -> list[dict]: | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT venue, year, COUNT(*) as paper_count | |
| FROM papers | |
| GROUP BY venue, year | |
| ORDER BY year, venue""" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def top_methods_by_year(self, top_n: int = 10) -> list[dict]: | |
| """Most popular methods per year using window functions.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT method_name, year, count, rank FROM ( | |
| SELECT m.method_name, p.year, COUNT(*) as count, | |
| RANK() OVER (PARTITION BY p.year ORDER BY COUNT(*) DESC) as rank | |
| FROM methods m | |
| JOIN papers p ON m.paper_id = p.id | |
| GROUP BY m.method_name, p.year | |
| ) | |
| WHERE rank <= ? | |
| ORDER BY year, rank""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def top_datasets_by_year(self, top_n: int = 10) -> list[dict]: | |
| """Most used datasets per year.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT dataset_name, year, count, rank FROM ( | |
| SELECT d.dataset_name, p.year, COUNT(*) as count, | |
| RANK() OVER (PARTITION BY p.year ORDER BY COUNT(*) DESC) as rank | |
| FROM datasets d | |
| JOIN papers p ON d.paper_id = p.id | |
| GROUP BY d.dataset_name, p.year | |
| ) | |
| WHERE rank <= ? | |
| ORDER BY year, rank""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def method_trend(self, method_name: str) -> list[dict]: | |
| """Track a specific method's adoption over time.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT p.year, COUNT(*) as paper_count | |
| FROM methods m | |
| JOIN papers p ON m.paper_id = p.id | |
| WHERE m.method_name LIKE ? | |
| GROUP BY p.year | |
| ORDER BY p.year""", | |
| (f"%{method_name}%",), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def dataset_trend(self, dataset_name: str) -> list[dict]: | |
| """Track a specific dataset's usage over time.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT p.year, COUNT(*) as paper_count | |
| FROM datasets d | |
| JOIN papers p ON d.paper_id = p.id | |
| WHERE d.dataset_name LIKE ? | |
| GROUP BY p.year | |
| ORDER BY p.year""", | |
| (f"%{dataset_name}%",), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def get_enrichment_stats(self) -> dict: | |
| """Get counts of unique enriched entities (distinct names).""" | |
| conn = self.get_connection() | |
| try: | |
| paper_count = conn.execute("SELECT COUNT(*) FROM papers").fetchone()[0] | |
| method_count = conn.execute( | |
| "SELECT COUNT(DISTINCT method_name) FROM methods" | |
| ).fetchone()[0] | |
| dataset_count = conn.execute( | |
| "SELECT COUNT(DISTINCT dataset_name) FROM datasets" | |
| ).fetchone()[0] | |
| task_count = conn.execute( | |
| "SELECT COUNT(DISTINCT task_name) FROM tasks" | |
| ).fetchone()[0] | |
| topic_count = conn.execute( | |
| "SELECT COUNT(DISTINCT topic_name) FROM topics" | |
| ).fetchone()[0] | |
| papers_with_methods = conn.execute( | |
| "SELECT COUNT(DISTINCT paper_id) FROM methods" | |
| ).fetchone()[0] | |
| return { | |
| "total_papers": paper_count, | |
| "total_methods": method_count, | |
| "total_datasets": dataset_count, | |
| "total_tasks": task_count, | |
| "total_topics": topic_count, | |
| "papers_with_methods": papers_with_methods, | |
| } | |
| finally: | |
| conn.close() | |
| def get_authors_for_papers(self, paper_ids: list[str]) -> dict[str, list[str]]: | |
| """Batch-fetch authors for multiple papers. | |
| Returns: | |
| Dict mapping paper_id β list of author names (ordered by position). | |
| """ | |
| if not paper_ids: | |
| return {} | |
| conn = self.get_connection() | |
| try: | |
| placeholders = ",".join("?" * len(paper_ids)) | |
| rows = conn.execute( | |
| f"SELECT paper_id, name FROM authors " | |
| f"WHERE paper_id IN ({placeholders}) " | |
| f"ORDER BY paper_id, position", | |
| paper_ids, | |
| ).fetchall() | |
| result: dict[str, list[str]] = {} | |
| for row in rows: | |
| result.setdefault(row["paper_id"], []).append(row["name"]) | |
| return result | |
| finally: | |
| conn.close() | |
| _ENTITY_TABLE_MAP = { | |
| "methods": ("methods", "method_name"), | |
| "datasets": ("datasets", "dataset_name"), | |
| "tasks": ("tasks", "task_name"), | |
| "topics": ("topics", "topic_name"), | |
| } | |
| def get_entity_list( | |
| self, entity_type: str, limit: int = 500 | |
| ) -> list[dict]: | |
| """Get all unique entity names with their paper counts. | |
| Args: | |
| entity_type: One of "methods", "datasets", "tasks", "topics". | |
| limit: Maximum entries to return. | |
| Returns: | |
| List of dicts with keys: name, count. Sorted by count descending. | |
| """ | |
| if entity_type not in self._ENTITY_TABLE_MAP: | |
| raise ValueError(f"Unknown entity type: {entity_type}") | |
| table, col = self._ENTITY_TABLE_MAP[entity_type] | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| f"SELECT {col} AS name, COUNT(*) AS count " | |
| f"FROM {table} GROUP BY {col} ORDER BY count DESC LIMIT ?", | |
| (limit,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| # ββ Co-occurrence analytics βββββββββββββββββββββββββββββββββββββββ | |
| def method_dataset_cooccurrence(self, top_n: int = 20) -> list[dict]: | |
| """Find which methods and datasets are used together most often. | |
| Returns rows of: method_name, dataset_name, co_count, ranked by frequency. | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT m.method_name, d.dataset_name, COUNT(*) as co_count | |
| FROM methods m | |
| JOIN datasets d ON m.paper_id = d.paper_id | |
| GROUP BY m.method_name, d.dataset_name | |
| ORDER BY co_count DESC | |
| LIMIT ?""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def method_task_cooccurrence(self, top_n: int = 20) -> list[dict]: | |
| """Find which methods are applied to which tasks most often. | |
| Returns rows of: method_name, task_name, co_count, ranked by frequency. | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT m.method_name, t.task_name, COUNT(*) as co_count | |
| FROM methods m | |
| JOIN tasks t ON m.paper_id = t.paper_id | |
| GROUP BY m.method_name, t.task_name | |
| ORDER BY co_count DESC | |
| LIMIT ?""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| # ββ Author analytics ββββββββββββββββββββββββββββββββββββββββββββββ | |
| def top_authors(self, top_n: int = 20) -> list[dict]: | |
| """Most prolific authors by paper count.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT a.name, COUNT(DISTINCT a.paper_id) as paper_count | |
| FROM authors a | |
| GROUP BY a.name | |
| ORDER BY paper_count DESC | |
| LIMIT ?""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def author_collaboration_pairs(self, top_n: int = 20) -> list[dict]: | |
| """Most frequent co-author pairs. | |
| Returns rows of: author_a, author_b, shared_papers. | |
| """ | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT a1.name as author_a, a2.name as author_b, | |
| COUNT(DISTINCT a1.paper_id) as shared_papers | |
| FROM authors a1 | |
| JOIN authors a2 ON a1.paper_id = a2.paper_id | |
| AND a1.name < a2.name | |
| GROUP BY a1.name, a2.name | |
| ORDER BY shared_papers DESC | |
| LIMIT ?""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| # ββ Task analytics ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def top_tasks_by_year(self, top_n: int = 10) -> list[dict]: | |
| """Most popular tasks per year using window functions.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT task_name, year, count, rank FROM ( | |
| SELECT t.task_name, p.year, COUNT(*) as count, | |
| RANK() OVER (PARTITION BY p.year ORDER BY COUNT(*) DESC) as rank | |
| FROM tasks t | |
| JOIN papers p ON t.paper_id = p.id | |
| GROUP BY t.task_name, p.year | |
| ) | |
| WHERE rank <= ? | |
| ORDER BY year, rank""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def task_trend(self, task_name: str) -> list[dict]: | |
| """Track a specific task's popularity over time.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT p.year, COUNT(*) as paper_count | |
| FROM tasks t | |
| JOIN papers p ON t.paper_id = p.id | |
| WHERE t.task_name LIKE ? | |
| GROUP BY p.year | |
| ORDER BY p.year""", | |
| (f"%{task_name}%",), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| # ββ Topic analytics βββββββββββββββββββββββββββββββββββββββββββββ | |
| def top_topics_by_year(self, top_n: int = 10) -> list[dict]: | |
| """Most popular topics per year using window functions.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT topic_name, year, count, rank FROM ( | |
| SELECT tp.topic_name, p.year, COUNT(*) as count, | |
| RANK() OVER (PARTITION BY p.year ORDER BY COUNT(*) DESC) as rank | |
| FROM topics tp | |
| JOIN papers p ON tp.paper_id = p.id | |
| GROUP BY tp.topic_name, p.year | |
| ) | |
| WHERE rank <= ? | |
| ORDER BY year, rank""", | |
| (top_n,), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def topic_trend(self, topic_name: str) -> list[dict]: | |
| """Track a specific topic's popularity over time.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT p.year, COUNT(*) as paper_count | |
| FROM topics tp | |
| JOIN papers p ON tp.paper_id = p.id | |
| WHERE tp.topic_name LIKE ? | |
| GROUP BY p.year | |
| ORDER BY p.year""", | |
| (f"%{topic_name}%",), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| # ββ Venue analytics βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def venue_method_profile(self, venue: str, top_n: int = 10) -> list[dict]: | |
| """Top methods at a specific venue (across all years).""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT m.method_name, COUNT(*) as paper_count | |
| FROM methods m | |
| JOIN papers p ON m.paper_id = p.id | |
| WHERE p.venue = ? | |
| GROUP BY m.method_name | |
| ORDER BY paper_count DESC | |
| LIMIT ?""", | |
| (venue, top_n), | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |
| def year_over_year_growth(self) -> list[dict]: | |
| """Paper count per year with year-over-year growth rate.""" | |
| conn = self.get_connection() | |
| try: | |
| rows = conn.execute( | |
| """SELECT year, paper_count, | |
| LAG(paper_count) OVER (ORDER BY year) as prev_count, | |
| CASE | |
| WHEN LAG(paper_count) OVER (ORDER BY year) > 0 | |
| THEN ROUND( | |
| 100.0 * (paper_count - LAG(paper_count) OVER (ORDER BY year)) | |
| / LAG(paper_count) OVER (ORDER BY year), 1) | |
| ELSE NULL | |
| END as growth_pct | |
| FROM ( | |
| SELECT year, COUNT(*) as paper_count | |
| FROM papers | |
| GROUP BY year | |
| ) | |
| ORDER BY year""" | |
| ).fetchall() | |
| return [dict(row) for row in rows] | |
| finally: | |
| conn.close() | |