""" Storage — SQL persistence for pipeline runs. Uses SQLite by default (``runs//db.sqlite``). Set ``DATABASE_URL`` env var for Postgres (e.g. ``postgresql://user:pass@host/db``). Tables: normalized_records – all NormalizedRecord rows stage_results – per-stage summary + status artifacts – artifact metadata """ from __future__ import annotations import json import logging import os import sqlite3 from pathlib import Path from typing import List, Optional from engine.io_contract import Artifact, EngineOutput, NormalizedRecord logger = logging.getLogger("engine.storage") # --------------------------------------------------------------------------- # Schema DDL (SQLite-compatible, works with Postgres too) # --------------------------------------------------------------------------- _DDL = """ CREATE TABLE IF NOT EXISTS normalized_records ( row_id TEXT PRIMARY KEY, source_file TEXT, source_type TEXT, timestamp TEXT, entity_name TEXT, entity_phone TEXT, entity_email TEXT, entity_ip TEXT, entity_domain TEXT, entity_hash TEXT, raw_text TEXT, extra TEXT ); CREATE TABLE IF NOT EXISTS stage_results ( stage TEXT PRIMARY KEY, status TEXT, summary TEXT, error TEXT, metadata TEXT ); CREATE TABLE IF NOT EXISTS artifacts ( name TEXT PRIMARY KEY, path TEXT, mime_type TEXT, description TEXT ); """ # --------------------------------------------------------------------------- # Storage backend # --------------------------------------------------------------------------- class StorageBackend: """ Thin wrapper around a SQLite (or Postgres) connection. For this iteration we use raw ``sqlite3``. A future iteration can swap in SQLAlchemy / SQLModel for Postgres parity. """ def __init__(self, db_path: Path): self.db_path = db_path self.db_path.parent.mkdir(parents=True, exist_ok=True) self._conn: Optional[sqlite3.Connection] = None # -- lifecycle ----------------------------------------------------------- def connect(self) -> None: logger.info("Connecting to SQLite: %s", self.db_path) self._conn = sqlite3.connect(str(self.db_path)) self._conn.executescript(_DDL) self._conn.commit() def close(self) -> None: if self._conn: self._conn.close() self._conn = None @property def conn(self) -> sqlite3.Connection: if self._conn is None: self.connect() assert self._conn is not None return self._conn # -- writes -------------------------------------------------------------- def insert_records(self, records: List[NormalizedRecord]) -> int: """Insert normalized records. Returns count inserted.""" if not records: return 0 sql = """ INSERT OR REPLACE INTO normalized_records (row_id, source_file, source_type, timestamp, entity_name, entity_phone, entity_email, entity_ip, entity_domain, entity_hash, raw_text, extra) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """ rows = [ ( r.row_id, r.source_file, r.source_type.value if r.source_type else "", r.timestamp.isoformat() if r.timestamp else None, r.entity_name, r.entity_phone, r.entity_email, r.entity_ip, r.entity_domain, r.entity_hash, r.raw_text, json.dumps(r.extra, ensure_ascii=False, default=str), ) for r in records ] self.conn.executemany(sql, rows) self.conn.commit() logger.info("Inserted %d normalized records", len(rows)) return len(rows) def insert_stage_result(self, output: EngineOutput) -> None: """Upsert a stage result row.""" sql = """ INSERT OR REPLACE INTO stage_results (stage, status, summary, error, metadata) VALUES (?, ?, ?, ?, ?) """ self.conn.execute(sql, ( output.stage, output.status.value, output.summary, output.error, json.dumps(output.metadata, ensure_ascii=False, default=str), )) self.conn.commit() def insert_artifact(self, artifact: Artifact) -> None: """Upsert an artifact metadata row.""" sql = """ INSERT OR REPLACE INTO artifacts (name, path, mime_type, description) VALUES (?, ?, ?, ?) """ self.conn.execute(sql, ( artifact.name, str(artifact.path), artifact.mime_type, artifact.description, )) self.conn.commit() # -- reads --------------------------------------------------------------- def count_records(self) -> int: cur = self.conn.execute("SELECT COUNT(*) FROM normalized_records") return cur.fetchone()[0] def fetch_all_records(self) -> List[dict]: """Return all normalized records as dicts.""" cur = self.conn.execute("SELECT * FROM normalized_records") cols = [d[0] for d in cur.description] return [dict(zip(cols, row)) for row in cur.fetchall()] def fetch_stage_results(self) -> List[dict]: cur = self.conn.execute("SELECT * FROM stage_results") cols = [d[0] for d in cur.description] return [dict(zip(cols, row)) for row in cur.fetchall()] def create_storage(db_path: Path) -> StorageBackend: """Factory: create and connect a StorageBackend.""" backend = StorageBackend(db_path) backend.connect() return backend