halide / storage /database.py
Lonelyguyse1's picture
Upload Halide Space: app, models, pipeline, storage, ui packages
a823f6a verified
Raw
History Blame Contribute Delete
5.51 kB
"""SQLite database. Stores diagnostic history and user sessions.
Schema:
sessions(id, started_at, film_type, film_age_years, storage, scan_dpi)
diagnoses(id, session_id, created_at, defect_count, label_counts_json,
diagnosis_text, vision_seconds, reasoning_seconds, total_seconds,
vision_model, reasoning_model, raw_json)
"""
from __future__ import annotations
import json
import logging
import sqlite3
import time
import uuid
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Iterator
logger = logging.getLogger(__name__)
REPO_ROOT = Path(__file__).resolve().parents[1]
DEFAULT_DB_PATH = REPO_ROOT / "storage" / "halide.db"
def get_db_path() -> Path:
import os
custom = os.getenv("HALIDE_DB_PATH")
if custom:
return Path(custom)
DEFAULT_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
return DEFAULT_DB_PATH
SCHEMA = """
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
started_at REAL NOT NULL,
film_type TEXT NOT NULL,
film_age_years INTEGER NOT NULL,
storage TEXT NOT NULL,
scan_dpi INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS diagnoses (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
created_at REAL NOT NULL,
defect_count INTEGER NOT NULL,
label_counts_json TEXT NOT NULL,
diagnosis_text TEXT NOT NULL,
vision_seconds REAL NOT NULL,
reasoning_seconds REAL NOT NULL,
total_seconds REAL NOT NULL,
vision_model TEXT NOT NULL,
reasoning_model TEXT NOT NULL,
raw_json TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(id)
);
CREATE INDEX IF NOT EXISTS idx_diagnoses_session ON diagnoses(session_id);
CREATE INDEX IF NOT EXISTS idx_diagnoses_created ON diagnoses(created_at);
"""
@contextmanager
def connect() -> Iterator[sqlite3.Connection]:
db_path = get_db_path()
conn = sqlite3.connect(str(db_path))
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
finally:
conn.close()
def init_db() -> None:
with connect() as conn:
conn.executescript(SCHEMA)
logger.info("DB initialized at %s", get_db_path())
def record_diagnosis(result: dict) -> str:
"""Persist a full pipeline result. Returns the diagnosis id."""
diagnosis_id = str(uuid.uuid4())
session_id = str(uuid.uuid4())
now = time.time()
meta = result.get("film_metadata", {}) or {}
defects = result.get("defects", {}) or {}
diagnosis = result.get("diagnosis", {}) or {}
with connect() as conn:
conn.execute(
"""
INSERT INTO sessions (id, started_at, film_type, film_age_years,
storage, scan_dpi)
VALUES (?, ?, ?, ?, ?, ?)
""",
(
session_id,
now,
meta.get("film_type", "Unknown"),
int(meta.get("film_age_years", 0) or 0),
meta.get("storage", "unknown"),
int(meta.get("scan_resolution_dpi", 0) or 0),
),
)
conn.execute(
"""
INSERT INTO diagnoses (id, session_id, created_at, defect_count,
label_counts_json, diagnosis_text,
vision_seconds, reasoning_seconds,
total_seconds, vision_model, reasoning_model,
raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
diagnosis_id,
session_id,
now,
int(defects.get("defect_count", 0) or 0),
json.dumps(defects.get("label_counts", {}) or {}),
diagnosis.get("diagnosis_text", ""),
float(defects.get("inference_seconds", 0.0) or 0.0),
float(diagnosis.get("reasoning_seconds", 0.0) or 0.0),
float(result.get("total_seconds", 0.0) or 0.0),
defects.get("model_path", ""),
diagnosis.get("model_path", ""),
json.dumps(result),
),
)
logger.info("Recorded diagnosis %s (session %s)", diagnosis_id, session_id)
return diagnosis_id
def list_recent(limit: int = 20) -> list[dict]:
with connect() as conn:
rows = conn.execute(
"""
SELECT d.id, d.created_at, s.film_type, s.film_age_years,
s.storage, s.scan_dpi, d.defect_count, d.label_counts_json,
d.diagnosis_text, d.total_seconds
FROM diagnoses d
JOIN sessions s ON s.id = d.session_id
ORDER BY d.created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
out: list[dict] = []
for r in rows:
out.append(
{
"id": r["id"],
"created_at": r["created_at"],
"film_type": r["film_type"],
"film_age_years": r["film_age_years"],
"storage": r["storage"],
"scan_dpi": r["scan_dpi"],
"defect_count": r["defect_count"],
"label_counts": json.loads(r["label_counts_json"]),
"diagnosis_text": r["diagnosis_text"],
"total_seconds": r["total_seconds"],
}
)
return out
__all__ = [
"init_db",
"record_diagnosis",
"list_recent",
"get_db_path",
]