WebRAG / src /database.py
Arun21102003
Initial clean commit
97f9138
import sqlite3
import json
from datetime import datetime
from typing import Optional, List, Dict
from contextlib import contextmanager
from src.config import config
class Database:
def __init__(self, db_path: str = None):
self.db_path = db_path or config.DATABASE_PATH
self._init_db()
@contextmanager
def get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def _init_db(self):
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS urls (
id TEXT PRIMARY KEY,
url TEXT NOT NULL UNIQUE,
status TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
completed_at TEXT,
error_message TEXT,
chunk_count INTEGER DEFAULT 0,
metadata TEXT
)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_url_status ON urls(status)
""")
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_url_created ON urls(created_at)
""")
def create_url_record(self, url_id: str, url: str) -> Dict:
now = datetime.utcnow().isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO urls (id, url, status, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
""", (url_id, url, "pending", now, now))
return {
"id": url_id,
"url": url,
"status": "pending",
"created_at": now,
"updated_at": now
}
def get_url_record(self, url_id: str) -> Optional[Dict]:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM urls WHERE id = ?", (url_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
def update_url_status(self, url_id: str, status: str,
error_message: Optional[str] = None,
chunk_count: Optional[int] = None,
metadata: Optional[Dict] = None):
now = datetime.utcnow().isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
if status == "completed":
cursor.execute("""
UPDATE urls
SET status = ?, updated_at = ?, completed_at = ?,
chunk_count = ?, metadata = ?
WHERE id = ?
""", (status, now, now, chunk_count or 0,
json.dumps(metadata) if metadata else None, url_id))
elif status == "failed":
cursor.execute("""
UPDATE urls
SET status = ?, updated_at = ?, error_message = ?
WHERE id = ?
""", (status, now, error_message, url_id))
else:
cursor.execute("""
UPDATE urls
SET status = ?, updated_at = ?
WHERE id = ?
""", (status, now, url_id))
def get_all_urls(self, status: Optional[str] = None) -> List[Dict]:
with self.get_connection() as conn:
cursor = conn.cursor()
if status:
cursor.execute("SELECT * FROM urls WHERE status = ? ORDER BY created_at DESC", (status,))
else:
cursor.execute("SELECT * FROM urls ORDER BY created_at DESC")
return [dict(row) for row in cursor.fetchall()]
db = Database()