"""SQLite database. Stores diagnostic history and user sessions. Schema: sessions(id, started_at, film_type, film_age_years, storage, scan_dpi) diagnoses(id, session_id, created_at, defect_count, label_counts_json, diagnosis_text, vision_seconds, reasoning_seconds, total_seconds, vision_model, reasoning_model, raw_json) """ from __future__ import annotations import json import logging import sqlite3 import time import uuid from contextlib import contextmanager from pathlib import Path from typing import Any, Iterator from config import get_app_config logger = logging.getLogger(__name__) REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_DB_PATH = REPO_ROOT / "storage" / "halide.db" _INITIALIZED_DB_PATHS: set[Path] = set() def get_db_path() -> Path: db_path = get_app_config().db_path db_path.parent.mkdir(parents=True, exist_ok=True) return db_path SCHEMA = """ CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, started_at REAL NOT NULL, film_type TEXT NOT NULL, film_age_years INTEGER NOT NULL, storage TEXT NOT NULL, scan_dpi INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS diagnoses ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, created_at REAL NOT NULL, defect_count INTEGER NOT NULL, label_counts_json TEXT NOT NULL, diagnosis_text TEXT NOT NULL, vision_seconds REAL NOT NULL, reasoning_seconds REAL NOT NULL, total_seconds REAL NOT NULL, vision_model TEXT NOT NULL, reasoning_model TEXT NOT NULL, raw_json TEXT NOT NULL, FOREIGN KEY (session_id) REFERENCES sessions(id) ); CREATE INDEX IF NOT EXISTS idx_diagnoses_session ON diagnoses(session_id); CREATE INDEX IF NOT EXISTS idx_diagnoses_created ON diagnoses(created_at); """ @contextmanager def connect() -> Iterator[sqlite3.Connection]: db_path = get_db_path() conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA journal_mode = WAL") try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with connect() as conn: conn.executescript(SCHEMA) db_path = get_db_path() _INITIALIZED_DB_PATHS.add(db_path) logger.info("DB initialized at %s", db_path) def _ensure_db_initialized() -> None: db_path = get_db_path() if db_path in _INITIALIZED_DB_PATHS and db_path.exists(): return init_db() def record_diagnosis(result: dict) -> str: """Persist a full pipeline result. Returns the diagnosis id.""" _ensure_db_initialized() diagnosis_id = str(uuid.uuid4()) session_id = str(uuid.uuid4()) now = time.time() meta = result.get("film_metadata", {}) or {} defects = result.get("defects", {}) or {} diagnosis = result.get("diagnosis", {}) or {} with connect() as conn: conn.execute( """ INSERT INTO sessions (id, started_at, film_type, film_age_years, storage, scan_dpi) VALUES (?, ?, ?, ?, ?, ?) """, ( session_id, now, meta.get("film_type", "Unknown"), int(meta.get("film_age_years", 0) or 0), meta.get("storage", "unknown"), int(meta.get("scan_resolution_dpi", 0) or 0), ), ) conn.execute( """ INSERT INTO diagnoses (id, session_id, created_at, defect_count, label_counts_json, diagnosis_text, vision_seconds, reasoning_seconds, total_seconds, vision_model, reasoning_model, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( diagnosis_id, session_id, now, int(defects.get("defect_count", 0) or 0), json.dumps(defects.get("label_counts", {}) or {}), diagnosis.get("diagnosis_text", ""), float(defects.get("inference_seconds", 0.0) or 0.0), float(diagnosis.get("reasoning_seconds", 0.0) or 0.0), float(result.get("total_seconds", 0.0) or 0.0), defects.get("model_path", ""), diagnosis.get("model_path", ""), json.dumps(result), ), ) logger.info("Recorded diagnosis %s (session %s)", diagnosis_id, session_id) return diagnosis_id def _decode_raw_json(text: str) -> dict[str, Any]: try: parsed = json.loads(text) except json.JSONDecodeError: return {} return parsed if isinstance(parsed, dict) else {} def _row_to_diagnosis(row: sqlite3.Row) -> dict[str, Any]: raw = _decode_raw_json(row["raw_json"]) meta = raw.get("film_metadata", {}) or {} return { "id": row["id"], "created_at": row["created_at"], "film_type": row["film_type"], "film_age_years": row["film_age_years"], "storage": row["storage"], "scan_dpi": row["scan_dpi"], "metadata_confidence": meta.get("metadata_confidence", "low"), "defect_count": row["defect_count"], "label_counts": json.loads(row["label_counts_json"]), "diagnosis_text": row["diagnosis_text"], "vision_seconds": row["vision_seconds"], "reasoning_seconds": row["reasoning_seconds"], "total_seconds": row["total_seconds"], "vision_model": row["vision_model"], "reasoning_model": row["reasoning_model"], "raw_json": raw, } def list_recent(limit: int = 20) -> list[dict]: _ensure_db_initialized() with connect() as conn: rows = conn.execute( """ SELECT d.id, d.created_at, s.film_type, s.film_age_years, s.storage, s.scan_dpi, d.defect_count, d.label_counts_json, d.diagnosis_text, d.vision_seconds, d.reasoning_seconds, d.total_seconds, d.vision_model, d.reasoning_model, d.raw_json FROM diagnoses d JOIN sessions s ON s.id = d.session_id ORDER BY d.created_at DESC LIMIT ? """, (limit,), ).fetchall() return [_row_to_diagnosis(r) for r in rows] def get_diagnosis(diagnosis_id: str) -> dict[str, Any] | None: """Return one persisted diagnosis, including its full pipeline JSON.""" if not diagnosis_id: return None _ensure_db_initialized() with connect() as conn: row = conn.execute( """ SELECT d.id, d.created_at, s.film_type, s.film_age_years, s.storage, s.scan_dpi, d.defect_count, d.label_counts_json, d.diagnosis_text, d.vision_seconds, d.reasoning_seconds, d.total_seconds, d.vision_model, d.reasoning_model, d.raw_json FROM diagnoses d JOIN sessions s ON s.id = d.session_id WHERE d.id = ? """, (diagnosis_id,), ).fetchone() if row is None: return None return _row_to_diagnosis(row) __all__ = [ "init_db", "record_diagnosis", "list_recent", "get_diagnosis", "get_db_path", ]