Pier-Jean's picture
Initial deploy: Docling Studio (local mode, port 7860)
5539271
"""SQLite database management — async via aiosqlite."""
from __future__ import annotations
import logging
import os
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
import aiosqlite
from infra.settings import settings
logger = logging.getLogger(__name__)
DB_PATH = settings.db_path
_SCHEMA = """
CREATE TABLE IF NOT EXISTS documents (
id TEXT PRIMARY KEY,
filename TEXT NOT NULL,
content_type TEXT,
file_size INTEGER,
page_count INTEGER,
storage_path TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS analysis_jobs (
id TEXT PRIMARY KEY,
document_id TEXT NOT NULL REFERENCES documents(id) ON DELETE CASCADE,
status TEXT NOT NULL DEFAULT 'PENDING',
content_markdown TEXT,
content_html TEXT,
pages_json TEXT,
document_json TEXT,
chunks_json TEXT,
error_message TEXT,
started_at TEXT,
completed_at TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_status ON analysis_jobs(status);
CREATE INDEX IF NOT EXISTS idx_analysis_jobs_created_at ON analysis_jobs(created_at);
CREATE INDEX IF NOT EXISTS idx_documents_created_at ON documents(created_at);
"""
_MIGRATIONS = [
("document_json", "ALTER TABLE analysis_jobs ADD COLUMN document_json TEXT"),
("chunks_json", "ALTER TABLE analysis_jobs ADD COLUMN chunks_json TEXT"),
]
async def _run_migrations(db: aiosqlite.Connection) -> None:
"""Add columns that may be missing in older databases."""
cursor = await db.execute("PRAGMA table_info(analysis_jobs)")
existing = {row[1] for row in await cursor.fetchall()}
for col_name, ddl in _MIGRATIONS:
if col_name not in existing:
await db.execute(ddl)
logger.info("Migration: added column %s to analysis_jobs", col_name)
await db.commit()
async def init_db() -> None:
"""Create database file and tables if they don't exist."""
os.makedirs(os.path.dirname(DB_PATH) or ".", exist_ok=True)
async with aiosqlite.connect(DB_PATH) as db:
await db.executescript(_SCHEMA)
await _run_migrations(db)
await db.commit()
logger.info("Database initialized at %s", DB_PATH)
async def get_db() -> aiosqlite.Connection:
"""Open a new database connection with row factory and FK enforcement."""
db = await aiosqlite.connect(DB_PATH)
db.row_factory = aiosqlite.Row
await db.execute("PRAGMA foreign_keys = ON")
return db
@asynccontextmanager
async def get_connection() -> AsyncIterator[aiosqlite.Connection]:
"""Context manager that opens and auto-closes a database connection."""
db = await get_db()
try:
yield db
finally:
await db.close()