Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import aiosqlite | |
| from datetime import date, datetime, timedelta | |
| from typing import Any, Dict, List, Optional | |
| from contextlib import asynccontextmanager | |
| class PapersDatabase(): | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| self.db_path = None | |
| async def init_db(self, config): | |
| """Initialize the database with required tables""" | |
| self.db_path = config.db_path | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| # Create papers cache table | |
| await cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS papers_cache ( | |
| date_str TEXT PRIMARY KEY, | |
| html_content TEXT NOT NULL, | |
| parsed_cards TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Create papers table for individual arXiv papers | |
| await cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS papers ( | |
| arxiv_id TEXT PRIMARY KEY, | |
| title TEXT NOT NULL, | |
| authors TEXT NOT NULL, | |
| abstract TEXT, | |
| categories TEXT, | |
| published_date TEXT, | |
| evaluation_content TEXT, | |
| evaluation_score REAL, | |
| overall_score REAL, | |
| evaluation_tags TEXT, | |
| evaluation_status TEXT DEFAULT 'not_started', | |
| is_evaluated BOOLEAN DEFAULT FALSE, | |
| evaluation_date TIMESTAMP, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Create latest_date table to track the most recent available date | |
| await cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS latest_date ( | |
| id INTEGER PRIMARY KEY CHECK (id = 1), | |
| date_str TEXT NOT NULL, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Insert default latest_date record if it doesn't exist | |
| await cursor.execute(''' | |
| INSERT OR IGNORE INTO latest_date (id, date_str) | |
| VALUES (1, ?) | |
| ''', (date.today().isoformat(),)) | |
| await conn.commit() | |
| async def get_connection(self): | |
| """Context manager for database connections""" | |
| conn = await aiosqlite.connect(self.db_path) | |
| conn.row_factory = aiosqlite.Row # Enable dict-like access | |
| # Enable WAL mode for better concurrency | |
| await conn.execute("PRAGMA journal_mode=WAL") | |
| await conn.execute("PRAGMA synchronous=NORMAL") | |
| await conn.execute("PRAGMA cache_size=10000") | |
| await conn.execute("PRAGMA temp_store=MEMORY") | |
| try: | |
| yield conn | |
| finally: | |
| await conn.close() | |
| async def get_cached_papers(self, date_str: str) -> Optional[Dict[str, Any]]: | |
| """Get cached papers for a specific date""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| SELECT parsed_cards, created_at | |
| FROM papers_cache | |
| WHERE date_str = ? | |
| ''', (date_str,)) | |
| row = await cursor.fetchone() | |
| if row: | |
| return { | |
| 'cards': json.loads(row['parsed_cards']), | |
| 'cached_at': row['created_at'] | |
| } | |
| return None | |
| async def cache_papers(self, date_str: str, html_content: str, parsed_cards: List[Dict[str, Any]]): | |
| """Cache papers for a specific date""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| INSERT OR REPLACE INTO papers_cache | |
| (date_str, html_content, parsed_cards, updated_at) | |
| VALUES (?, ?, ?, CURRENT_TIMESTAMP) | |
| ''', (date_str, html_content, json.dumps(parsed_cards))) | |
| await conn.commit() | |
| async def get_latest_cached_date(self) -> Optional[str]: | |
| """Get the latest cached date""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute('SELECT date_str FROM latest_date WHERE id = 1') | |
| row = await cursor.fetchone() | |
| return row['date_str'] if row else None | |
| async def update_latest_date(self, date_str: str): | |
| """Update the latest available date""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| UPDATE latest_date | |
| SET date_str = ?, updated_at = CURRENT_TIMESTAMP | |
| WHERE id = 1 | |
| ''', (date_str,)) | |
| await conn.commit() | |
| async def is_cache_fresh(self, date_str: str, max_age_hours: int = 24) -> bool: | |
| """Check if cache is fresh (within max_age_hours)""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| SELECT updated_at | |
| FROM papers_cache | |
| WHERE date_str = ? | |
| ''', (date_str,)) | |
| row = await cursor.fetchone() | |
| if not row: | |
| return False | |
| cached_time = datetime.fromisoformat(row['updated_at'].replace('Z', '+00:00')) | |
| age = datetime.now(cached_time.tzinfo) - cached_time | |
| return age.total_seconds() < max_age_hours * 3600 | |
| async def cleanup_old_cache(self, days_to_keep: int = 7): | |
| """Clean up old cache entries""" | |
| cutoff_date = (datetime.now() - timedelta(days=days_to_keep)).isoformat() | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| DELETE FROM papers_cache | |
| WHERE updated_at < ? | |
| ''', (cutoff_date,)) | |
| await conn.commit() | |
| # Papers table methods | |
| async def insert_paper(self, arxiv_id: str, title: str, authors: str, abstract: str = None, | |
| categories: str = None, published_date: str = None): | |
| """Insert a new paper into the papers table""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| INSERT OR REPLACE INTO papers | |
| (arxiv_id, title, authors, abstract, categories, published_date, updated_at) | |
| VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) | |
| ''', (arxiv_id, title, authors, abstract, categories, published_date)) | |
| await conn.commit() | |
| async def get_paper(self, arxiv_id: str) -> Optional[Dict[str, Any]]: | |
| """Get a paper by arxiv_id""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| SELECT * FROM papers WHERE arxiv_id = ? | |
| ''', (arxiv_id,)) | |
| row = await cursor.fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| async def get_papers_by_evaluation_status(self, is_evaluated: bool = None) -> List[Dict[str, Any]]: | |
| """Get papers by evaluation status""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| if is_evaluated is None: | |
| await cursor.execute('SELECT * FROM papers ORDER BY created_at DESC') | |
| else: | |
| await cursor.execute(''' | |
| SELECT * FROM papers | |
| WHERE is_evaluated = ? | |
| ORDER BY created_at DESC | |
| ''', (is_evaluated,)) | |
| rows = await cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| async def update_paper_evaluation(self, arxiv_id: str, evaluation_content: str, | |
| evaluation_score: float = None, overall_score: float = None, evaluation_tags: str = None): | |
| """Update paper with evaluation content""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| UPDATE papers | |
| SET evaluation_content = ?, | |
| evaluation_score = ?, | |
| overall_score = ?, | |
| evaluation_tags = ?, | |
| is_evaluated = TRUE, | |
| evaluation_status = 'completed', | |
| evaluation_date = CURRENT_TIMESTAMP, | |
| updated_at = CURRENT_TIMESTAMP | |
| WHERE arxiv_id = ? | |
| ''', (evaluation_content, evaluation_score, overall_score, evaluation_tags, arxiv_id)) | |
| await conn.commit() | |
| async def update_paper_status(self, arxiv_id: str, status: str): | |
| """Update paper evaluation status""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute(''' | |
| UPDATE papers | |
| SET evaluation_status = ?, | |
| updated_at = CURRENT_TIMESTAMP | |
| WHERE arxiv_id = ? | |
| ''', (status, arxiv_id)) | |
| await conn.commit() | |
| async def get_unevaluated_papers(self) -> List[Dict[str, Any]]: | |
| """Get all papers that haven't been evaluated yet""" | |
| return await self.get_papers_by_evaluation_status(is_evaluated=False) | |
| async def get_evaluated_papers(self) -> List[Dict[str, Any]]: | |
| """Get all papers that have been evaluated""" | |
| return await self.get_papers_by_evaluation_status(is_evaluated=True) | |
| async def search_papers(self, query: str) -> List[Dict[str, Any]]: | |
| """Search papers by title, authors, or abstract""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| search_pattern = f'%{query}%' | |
| await cursor.execute(''' | |
| SELECT * FROM papers | |
| WHERE title LIKE ? OR authors LIKE ? OR abstract LIKE ? | |
| ORDER BY created_at DESC | |
| ''', (search_pattern, search_pattern, search_pattern)) | |
| rows = await cursor.fetchall() | |
| return [dict(row) for row in rows] | |
| async def delete_paper(self, arxiv_id: str): | |
| """Delete a paper from the database""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute('DELETE FROM papers WHERE arxiv_id = ?', (arxiv_id,)) | |
| await conn.commit() | |
| async def get_papers_count(self) -> Dict[str, int]: | |
| """Get count of papers by evaluation status""" | |
| async with self.get_connection() as conn: | |
| cursor = await conn.cursor() | |
| await cursor.execute('SELECT COUNT(*) as total FROM papers') | |
| total_row = await cursor.fetchone() | |
| total = total_row['total'] | |
| await cursor.execute('SELECT COUNT(*) as evaluated FROM papers WHERE is_evaluated = TRUE') | |
| evaluated_row = await cursor.fetchone() | |
| evaluated = evaluated_row['evaluated'] | |
| return { | |
| 'total': total, | |
| 'evaluated': evaluated, | |
| 'unevaluated': total - evaluated | |
| } | |
| def __str__(self): | |
| return f"PapersDatabase(db_path={self.db_path})" | |
| def __repr__(self): | |
| return self.__str__() | |
| db = PapersDatabase() |