"""SQLite database. Stores diagnostic history and user sessions. Schema: sessions(id, started_at, film_type, film_age_years, storage, scan_dpi) diagnoses(id, session_id, created_at, defect_count, label_counts_json, diagnosis_text, vision_seconds, reasoning_seconds, total_seconds, vision_model, reasoning_model, raw_json) """ from __future__ import annotations import json import logging import sqlite3 import time import uuid from contextlib import contextmanager from pathlib import Path from typing import Any, Iterator logger = logging.getLogger(__name__) REPO_ROOT = Path(__file__).resolve().parents[1] DEFAULT_DB_PATH = REPO_ROOT / "storage" / "halide.db" def get_db_path() -> Path: import os custom = os.getenv("HALIDE_DB_PATH") if custom: return Path(custom) DEFAULT_DB_PATH.parent.mkdir(parents=True, exist_ok=True) return DEFAULT_DB_PATH SCHEMA = """ CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, started_at REAL NOT NULL, film_type TEXT NOT NULL, film_age_years INTEGER NOT NULL, storage TEXT NOT NULL, scan_dpi INTEGER NOT NULL ); CREATE TABLE IF NOT EXISTS diagnoses ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, created_at REAL NOT NULL, defect_count INTEGER NOT NULL, label_counts_json TEXT NOT NULL, diagnosis_text TEXT NOT NULL, vision_seconds REAL NOT NULL, reasoning_seconds REAL NOT NULL, total_seconds REAL NOT NULL, vision_model TEXT NOT NULL, reasoning_model TEXT NOT NULL, raw_json TEXT NOT NULL, FOREIGN KEY (session_id) REFERENCES sessions(id) ); CREATE INDEX IF NOT EXISTS idx_diagnoses_session ON diagnoses(session_id); CREATE INDEX IF NOT EXISTS idx_diagnoses_created ON diagnoses(created_at); """ @contextmanager def connect() -> Iterator[sqlite3.Connection]: db_path = get_db_path() conn = sqlite3.connect(str(db_path)) conn.row_factory = sqlite3.Row try: yield conn conn.commit() finally: conn.close() def init_db() -> None: with connect() as conn: conn.executescript(SCHEMA) logger.info("DB initialized at %s", get_db_path()) def record_diagnosis(result: dict) -> str: """Persist a full pipeline result. Returns the diagnosis id.""" diagnosis_id = str(uuid.uuid4()) session_id = str(uuid.uuid4()) now = time.time() meta = result.get("film_metadata", {}) or {} defects = result.get("defects", {}) or {} diagnosis = result.get("diagnosis", {}) or {} with connect() as conn: conn.execute( """ INSERT INTO sessions (id, started_at, film_type, film_age_years, storage, scan_dpi) VALUES (?, ?, ?, ?, ?, ?) """, ( session_id, now, meta.get("film_type", "Unknown"), int(meta.get("film_age_years", 0) or 0), meta.get("storage", "unknown"), int(meta.get("scan_resolution_dpi", 0) or 0), ), ) conn.execute( """ INSERT INTO diagnoses (id, session_id, created_at, defect_count, label_counts_json, diagnosis_text, vision_seconds, reasoning_seconds, total_seconds, vision_model, reasoning_model, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( diagnosis_id, session_id, now, int(defects.get("defect_count", 0) or 0), json.dumps(defects.get("label_counts", {}) or {}), diagnosis.get("diagnosis_text", ""), float(defects.get("inference_seconds", 0.0) or 0.0), float(diagnosis.get("reasoning_seconds", 0.0) or 0.0), float(result.get("total_seconds", 0.0) or 0.0), defects.get("model_path", ""), diagnosis.get("model_path", ""), json.dumps(result), ), ) logger.info("Recorded diagnosis %s (session %s)", diagnosis_id, session_id) return diagnosis_id def list_recent(limit: int = 20) -> list[dict]: with connect() as conn: rows = conn.execute( """ SELECT d.id, d.created_at, s.film_type, s.film_age_years, s.storage, s.scan_dpi, d.defect_count, d.label_counts_json, d.diagnosis_text, d.total_seconds FROM diagnoses d JOIN sessions s ON s.id = d.session_id ORDER BY d.created_at DESC LIMIT ? """, (limit,), ).fetchall() out: list[dict] = [] for r in rows: out.append( { "id": r["id"], "created_at": r["created_at"], "film_type": r["film_type"], "film_age_years": r["film_age_years"], "storage": r["storage"], "scan_dpi": r["scan_dpi"], "defect_count": r["defect_count"], "label_counts": json.loads(r["label_counts_json"]), "diagnosis_text": r["diagnosis_text"], "total_seconds": r["total_seconds"], } ) return out __all__ = [ "init_db", "record_diagnosis", "list_recent", "get_db_path", ]