Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| from datetime import datetime | |
| from typing import Optional, List, Dict | |
| from contextlib import contextmanager | |
| from src.config import config | |
| class Database: | |
| def __init__(self, db_path: str = None): | |
| self.db_path = db_path or config.DATABASE_PATH | |
| self._init_db() | |
| def get_connection(self): | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| conn.commit() | |
| except Exception as e: | |
| conn.rollback() | |
| raise e | |
| finally: | |
| conn.close() | |
| def _init_db(self): | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS urls ( | |
| id TEXT PRIMARY KEY, | |
| url TEXT NOT NULL UNIQUE, | |
| status TEXT NOT NULL, | |
| created_at TEXT NOT NULL, | |
| updated_at TEXT NOT NULL, | |
| completed_at TEXT, | |
| error_message TEXT, | |
| chunk_count INTEGER DEFAULT 0, | |
| metadata TEXT | |
| ) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_url_status ON urls(status) | |
| """) | |
| cursor.execute(""" | |
| CREATE INDEX IF NOT EXISTS idx_url_created ON urls(created_at) | |
| """) | |
| def create_url_record(self, url_id: str, url: str) -> Dict: | |
| now = datetime.utcnow().isoformat() | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO urls (id, url, status, created_at, updated_at) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, (url_id, url, "pending", now, now)) | |
| return { | |
| "id": url_id, | |
| "url": url, | |
| "status": "pending", | |
| "created_at": now, | |
| "updated_at": now | |
| } | |
| def get_url_record(self, url_id: str) -> Optional[Dict]: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM urls WHERE id = ?", (url_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| def update_url_status(self, url_id: str, status: str, | |
| error_message: Optional[str] = None, | |
| chunk_count: Optional[int] = None, | |
| metadata: Optional[Dict] = None): | |
| now = datetime.utcnow().isoformat() | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| if status == "completed": | |
| cursor.execute(""" | |
| UPDATE urls | |
| SET status = ?, updated_at = ?, completed_at = ?, | |
| chunk_count = ?, metadata = ? | |
| WHERE id = ? | |
| """, (status, now, now, chunk_count or 0, | |
| json.dumps(metadata) if metadata else None, url_id)) | |
| elif status == "failed": | |
| cursor.execute(""" | |
| UPDATE urls | |
| SET status = ?, updated_at = ?, error_message = ? | |
| WHERE id = ? | |
| """, (status, now, error_message, url_id)) | |
| else: | |
| cursor.execute(""" | |
| UPDATE urls | |
| SET status = ?, updated_at = ? | |
| WHERE id = ? | |
| """, (status, now, url_id)) | |
| def get_all_urls(self, status: Optional[str] = None) -> List[Dict]: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| if status: | |
| cursor.execute("SELECT * FROM urls WHERE status = ? ORDER BY created_at DESC", (status,)) | |
| else: | |
| cursor.execute("SELECT * FROM urls ORDER BY created_at DESC") | |
| return [dict(row) for row in cursor.fetchall()] | |
| db = Database() | |