| """ |
| Storage — SQL persistence for pipeline runs. |
| |
| Uses SQLite by default (``runs/<run_id>/db.sqlite``). |
| Set ``DATABASE_URL`` env var for Postgres (e.g. ``postgresql://user:pass@host/db``). |
| |
| Tables: |
| normalized_records – all NormalizedRecord rows |
| stage_results – per-stage summary + status |
| artifacts – artifact metadata |
| """ |
|
|
| from __future__ import annotations |
|
|
| import json |
| import logging |
| import os |
| import sqlite3 |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| from engine.io_contract import Artifact, EngineOutput, NormalizedRecord |
|
|
| logger = logging.getLogger("engine.storage") |
|
|
| |
| |
| |
|
|
| _DDL = """ |
| CREATE TABLE IF NOT EXISTS normalized_records ( |
| row_id TEXT PRIMARY KEY, |
| source_file TEXT, |
| source_type TEXT, |
| timestamp TEXT, |
| entity_name TEXT, |
| entity_phone TEXT, |
| entity_email TEXT, |
| entity_ip TEXT, |
| entity_domain TEXT, |
| entity_hash TEXT, |
| raw_text TEXT, |
| extra TEXT |
| ); |
| |
| CREATE TABLE IF NOT EXISTS stage_results ( |
| stage TEXT PRIMARY KEY, |
| status TEXT, |
| summary TEXT, |
| error TEXT, |
| metadata TEXT |
| ); |
| |
| CREATE TABLE IF NOT EXISTS artifacts ( |
| name TEXT PRIMARY KEY, |
| path TEXT, |
| mime_type TEXT, |
| description TEXT |
| ); |
| """ |
|
|
|
|
| |
| |
| |
|
|
| class StorageBackend: |
| """ |
| Thin wrapper around a SQLite (or Postgres) connection. |
| |
| For this iteration we use raw ``sqlite3``. A future iteration can |
| swap in SQLAlchemy / SQLModel for Postgres parity. |
| """ |
|
|
| def __init__(self, db_path: Path): |
| self.db_path = db_path |
| self.db_path.parent.mkdir(parents=True, exist_ok=True) |
| self._conn: Optional[sqlite3.Connection] = None |
|
|
| |
|
|
| def connect(self) -> None: |
| logger.info("Connecting to SQLite: %s", self.db_path) |
| self._conn = sqlite3.connect(str(self.db_path)) |
| self._conn.executescript(_DDL) |
| self._conn.commit() |
|
|
| def close(self) -> None: |
| if self._conn: |
| self._conn.close() |
| self._conn = None |
|
|
| @property |
| def conn(self) -> sqlite3.Connection: |
| if self._conn is None: |
| self.connect() |
| assert self._conn is not None |
| return self._conn |
|
|
| |
|
|
| def insert_records(self, records: List[NormalizedRecord]) -> int: |
| """Insert normalized records. Returns count inserted.""" |
| if not records: |
| return 0 |
| sql = """ |
| INSERT OR REPLACE INTO normalized_records |
| (row_id, source_file, source_type, timestamp, |
| entity_name, entity_phone, entity_email, |
| entity_ip, entity_domain, entity_hash, |
| raw_text, extra) |
| VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) |
| """ |
| rows = [ |
| ( |
| r.row_id, |
| r.source_file, |
| r.source_type.value if r.source_type else "", |
| r.timestamp.isoformat() if r.timestamp else None, |
| r.entity_name, |
| r.entity_phone, |
| r.entity_email, |
| r.entity_ip, |
| r.entity_domain, |
| r.entity_hash, |
| r.raw_text, |
| json.dumps(r.extra, ensure_ascii=False, default=str), |
| ) |
| for r in records |
| ] |
| self.conn.executemany(sql, rows) |
| self.conn.commit() |
| logger.info("Inserted %d normalized records", len(rows)) |
| return len(rows) |
|
|
| def insert_stage_result(self, output: EngineOutput) -> None: |
| """Upsert a stage result row.""" |
| sql = """ |
| INSERT OR REPLACE INTO stage_results |
| (stage, status, summary, error, metadata) |
| VALUES (?, ?, ?, ?, ?) |
| """ |
| self.conn.execute(sql, ( |
| output.stage, |
| output.status.value, |
| output.summary, |
| output.error, |
| json.dumps(output.metadata, ensure_ascii=False, default=str), |
| )) |
| self.conn.commit() |
|
|
| def insert_artifact(self, artifact: Artifact) -> None: |
| """Upsert an artifact metadata row.""" |
| sql = """ |
| INSERT OR REPLACE INTO artifacts |
| (name, path, mime_type, description) |
| VALUES (?, ?, ?, ?) |
| """ |
| self.conn.execute(sql, ( |
| artifact.name, |
| str(artifact.path), |
| artifact.mime_type, |
| artifact.description, |
| )) |
| self.conn.commit() |
|
|
| |
|
|
| def count_records(self) -> int: |
| cur = self.conn.execute("SELECT COUNT(*) FROM normalized_records") |
| return cur.fetchone()[0] |
|
|
| def fetch_all_records(self) -> List[dict]: |
| """Return all normalized records as dicts.""" |
| cur = self.conn.execute("SELECT * FROM normalized_records") |
| cols = [d[0] for d in cur.description] |
| return [dict(zip(cols, row)) for row in cur.fetchall()] |
|
|
| def fetch_stage_results(self) -> List[dict]: |
| cur = self.conn.execute("SELECT * FROM stage_results") |
| cols = [d[0] for d in cur.description] |
| return [dict(zip(cols, row)) for row in cur.fetchall()] |
|
|
|
|
| def create_storage(db_path: Path) -> StorageBackend: |
| """Factory: create and connect a StorageBackend.""" |
| backend = StorageBackend(db_path) |
| backend.connect() |
| return backend |
|
|