import sqlite3 import json from datetime import datetime from typing import Optional, List, Dict from contextlib import contextmanager from src.config import config class Database: def __init__(self, db_path: str = None): self.db_path = db_path or config.DATABASE_PATH self._init_db() @contextmanager def get_connection(self): conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def _init_db(self): with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" CREATE TABLE IF NOT EXISTS urls ( id TEXT PRIMARY KEY, url TEXT NOT NULL UNIQUE, status TEXT NOT NULL, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, completed_at TEXT, error_message TEXT, chunk_count INTEGER DEFAULT 0, metadata TEXT ) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_url_status ON urls(status) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_url_created ON urls(created_at) """) def create_url_record(self, url_id: str, url: str) -> Dict: now = datetime.utcnow().isoformat() with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO urls (id, url, status, created_at, updated_at) VALUES (?, ?, ?, ?, ?) """, (url_id, url, "pending", now, now)) return { "id": url_id, "url": url, "status": "pending", "created_at": now, "updated_at": now } def get_url_record(self, url_id: str) -> Optional[Dict]: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM urls WHERE id = ?", (url_id,)) row = cursor.fetchone() if row: return dict(row) return None def update_url_status(self, url_id: str, status: str, error_message: Optional[str] = None, chunk_count: Optional[int] = None, metadata: Optional[Dict] = None): now = datetime.utcnow().isoformat() with self.get_connection() as conn: cursor = conn.cursor() if status == "completed": cursor.execute(""" UPDATE urls SET status = ?, updated_at = ?, completed_at = ?, chunk_count = ?, metadata = ? WHERE id = ? """, (status, now, now, chunk_count or 0, json.dumps(metadata) if metadata else None, url_id)) elif status == "failed": cursor.execute(""" UPDATE urls SET status = ?, updated_at = ?, error_message = ? WHERE id = ? """, (status, now, error_message, url_id)) else: cursor.execute(""" UPDATE urls SET status = ?, updated_at = ? WHERE id = ? """, (status, now, url_id)) def get_all_urls(self, status: Optional[str] = None) -> List[Dict]: with self.get_connection() as conn: cursor = conn.cursor() if status: cursor.execute("SELECT * FROM urls WHERE status = ? ORDER BY created_at DESC", (status,)) else: cursor.execute("SELECT * FROM urls ORDER BY created_at DESC") return [dict(row) for row in cursor.fetchall()] db = Database()