mod-osint / engine /storage.py
moddux's picture
deploy: HF sanitized GUI snapshot
b75c637
"""
Storage — SQL persistence for pipeline runs.
Uses SQLite by default (``runs/<run_id>/db.sqlite``).
Set ``DATABASE_URL`` env var for Postgres (e.g. ``postgresql://user:pass@host/db``).
Tables:
normalized_records – all NormalizedRecord rows
stage_results – per-stage summary + status
artifacts – artifact metadata
"""
from __future__ import annotations
import json
import logging
import os
import sqlite3
from pathlib import Path
from typing import List, Optional
from engine.io_contract import Artifact, EngineOutput, NormalizedRecord
logger = logging.getLogger("engine.storage")
# ---------------------------------------------------------------------------
# Schema DDL (SQLite-compatible, works with Postgres too)
# ---------------------------------------------------------------------------
_DDL = """
CREATE TABLE IF NOT EXISTS normalized_records (
row_id TEXT PRIMARY KEY,
source_file TEXT,
source_type TEXT,
timestamp TEXT,
entity_name TEXT,
entity_phone TEXT,
entity_email TEXT,
entity_ip TEXT,
entity_domain TEXT,
entity_hash TEXT,
raw_text TEXT,
extra TEXT
);
CREATE TABLE IF NOT EXISTS stage_results (
stage TEXT PRIMARY KEY,
status TEXT,
summary TEXT,
error TEXT,
metadata TEXT
);
CREATE TABLE IF NOT EXISTS artifacts (
name TEXT PRIMARY KEY,
path TEXT,
mime_type TEXT,
description TEXT
);
"""
# ---------------------------------------------------------------------------
# Storage backend
# ---------------------------------------------------------------------------
class StorageBackend:
"""
Thin wrapper around a SQLite (or Postgres) connection.
For this iteration we use raw ``sqlite3``. A future iteration can
swap in SQLAlchemy / SQLModel for Postgres parity.
"""
def __init__(self, db_path: Path):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self._conn: Optional[sqlite3.Connection] = None
# -- lifecycle -----------------------------------------------------------
def connect(self) -> None:
logger.info("Connecting to SQLite: %s", self.db_path)
self._conn = sqlite3.connect(str(self.db_path))
self._conn.executescript(_DDL)
self._conn.commit()
def close(self) -> None:
if self._conn:
self._conn.close()
self._conn = None
@property
def conn(self) -> sqlite3.Connection:
if self._conn is None:
self.connect()
assert self._conn is not None
return self._conn
# -- writes --------------------------------------------------------------
def insert_records(self, records: List[NormalizedRecord]) -> int:
"""Insert normalized records. Returns count inserted."""
if not records:
return 0
sql = """
INSERT OR REPLACE INTO normalized_records
(row_id, source_file, source_type, timestamp,
entity_name, entity_phone, entity_email,
entity_ip, entity_domain, entity_hash,
raw_text, extra)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"""
rows = [
(
r.row_id,
r.source_file,
r.source_type.value if r.source_type else "",
r.timestamp.isoformat() if r.timestamp else None,
r.entity_name,
r.entity_phone,
r.entity_email,
r.entity_ip,
r.entity_domain,
r.entity_hash,
r.raw_text,
json.dumps(r.extra, ensure_ascii=False, default=str),
)
for r in records
]
self.conn.executemany(sql, rows)
self.conn.commit()
logger.info("Inserted %d normalized records", len(rows))
return len(rows)
def insert_stage_result(self, output: EngineOutput) -> None:
"""Upsert a stage result row."""
sql = """
INSERT OR REPLACE INTO stage_results
(stage, status, summary, error, metadata)
VALUES (?, ?, ?, ?, ?)
"""
self.conn.execute(sql, (
output.stage,
output.status.value,
output.summary,
output.error,
json.dumps(output.metadata, ensure_ascii=False, default=str),
))
self.conn.commit()
def insert_artifact(self, artifact: Artifact) -> None:
"""Upsert an artifact metadata row."""
sql = """
INSERT OR REPLACE INTO artifacts
(name, path, mime_type, description)
VALUES (?, ?, ?, ?)
"""
self.conn.execute(sql, (
artifact.name,
str(artifact.path),
artifact.mime_type,
artifact.description,
))
self.conn.commit()
# -- reads ---------------------------------------------------------------
def count_records(self) -> int:
cur = self.conn.execute("SELECT COUNT(*) FROM normalized_records")
return cur.fetchone()[0]
def fetch_all_records(self) -> List[dict]:
"""Return all normalized records as dicts."""
cur = self.conn.execute("SELECT * FROM normalized_records")
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
def fetch_stage_results(self) -> List[dict]:
cur = self.conn.execute("SELECT * FROM stage_results")
cols = [d[0] for d in cur.description]
return [dict(zip(cols, row)) for row in cur.fetchall()]
def create_storage(db_path: Path) -> StorageBackend:
"""Factory: create and connect a StorageBackend."""
backend = StorageBackend(db_path)
backend.connect()
return backend