""" HaramGuard — Database Layer ============================ SQLite persistence for risk events, operational decisions, and coordinator plans. All agents write/read through this single interface. """ import os import json import sqlite3 from datetime import datetime from typing import Optional from core.models import RiskResult, Decision class HajjFlowDB: def __init__(self, path: str = 'outputs/hajjflow_rt.db'): os.makedirs(os.path.dirname(path), exist_ok=True) self.conn = sqlite3.connect(path, check_same_thread=False) self._init_tables() print(f'🗄️ [DB] Connected -> {path}') def _init_tables(self): self.conn.executescript(""" CREATE TABLE IF NOT EXISTS risk_events ( id INTEGER PRIMARY KEY AUTOINCREMENT, frame_id INTEGER, timestamp TEXT, risk_score REAL, risk_level TEXT, trend TEXT, window_avg REAL, window_max INTEGER ); CREATE TABLE IF NOT EXISTS op_decisions ( id INTEGER PRIMARY KEY AUTOINCREMENT, frame_id INTEGER, timestamp TEXT, context TEXT, priority TEXT, actions TEXT, risk_score REAL, risk_level TEXT ); CREATE TABLE IF NOT EXISTS coordinator_plans ( id INTEGER PRIMARY KEY AUTOINCREMENT, frame_id INTEGER, timestamp TEXT, threat_level TEXT, summary TEXT, arabic_alert TEXT, confidence REAL, raw_json TEXT ); CREATE TABLE IF NOT EXISTS reflection_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, frame_id INTEGER, timestamp TEXT, original_level TEXT, original_score REAL, corrected_level TEXT, corrected_score REAL, bias_detected INTEGER, critique TEXT, person_count INTEGER ); """) self.conn.commit() # ── Writes ─────────────────────────────────────────────────────────── def save_risk_event(self, rr: RiskResult): self.conn.execute( 'INSERT INTO risk_events ' '(frame_id,timestamp,risk_score,risk_level,trend,window_avg,window_max) ' 'VALUES (?,?,?,?,?,?,?)', (rr.frame_id, datetime.fromtimestamp(rr.timestamp).isoformat(), rr.risk_score, rr.risk_level, rr.trend, rr.window_avg, rr.window_max)) self.conn.commit() def save_decision(self, d: Decision): self.conn.execute( 'INSERT INTO op_decisions ' '(frame_id,timestamp,context,priority,actions,risk_score,risk_level) ' 'VALUES (?,?,?,?,?,?,?)', (d.frame_id, d.timestamp, d.context, d.priority, json.dumps(d.actions), d.risk_score, d.risk_level)) self.conn.commit() def save_coordinator_plan(self, frame_id: int, plan: dict): self.conn.execute( 'INSERT INTO coordinator_plans ' '(frame_id,timestamp,threat_level,summary,arabic_alert,confidence,raw_json) ' 'VALUES (?,?,?,?,?,?,?)', (frame_id, datetime.now().isoformat(), plan.get('threat_level', 'UNKNOWN'), plan.get('executive_summary', ''), plan.get('arabic_alert', ''), plan.get('confidence_score', 0), json.dumps(plan, ensure_ascii=False))) self.conn.commit() def save_reflection(self, reflection: dict): self.conn.execute( 'INSERT INTO reflection_log ' '(frame_id,timestamp,original_level,original_score,' 'corrected_level,corrected_score,bias_detected,critique,person_count) ' 'VALUES (?,?,?,?,?,?,?,?,?)', (reflection['frame_id'], datetime.now().isoformat(), reflection['original_level'], reflection['original_score'], reflection['corrected_level'], reflection['corrected_score'], int(reflection['bias_detected']), reflection['critique'], reflection['person_count'])) self.conn.commit() # ── Reads ──────────────────────────────────────────────────────────── def get_last_p0_time(self, context: str) -> Optional[datetime]: cur = self.conn.execute( 'SELECT timestamp FROM op_decisions ' 'WHERE context=? AND priority="P0" ' 'ORDER BY timestamp DESC LIMIT 1', (context,)) row = cur.fetchone() return datetime.fromisoformat(row[0]) if row else None def get_recent_decisions(self, limit: int = 10) -> list: """ Return recent decisions enriched with coordinator plan data (selected_gates, justification) via LEFT JOIN on frame_id. """ cur = self.conn.execute( 'SELECT d.frame_id, d.timestamp, d.context, d.priority, ' ' d.actions, d.risk_score, d.risk_level, ' ' cp.raw_json AS plan_json ' 'FROM op_decisions d ' 'LEFT JOIN coordinator_plans cp ON cp.frame_id = d.frame_id ' 'ORDER BY d.id DESC LIMIT ?', (limit,)) rows = [] for row in cur.fetchall(): record = { 'frame_id': row[0], 'timestamp': row[1], 'context': row[2], 'priority': row[3], 'actions': row[4], 'risk_score': row[5], 'risk_level': row[6], } # Enrich with coordinator plan fields if available if row[7]: try: plan = json.loads(row[7]) record['selected_gates'] = plan.get('selected_gates', []) record['justification'] = plan.get('actions_justification', '') record['arabic_alert'] = plan.get('arabic_alert', '') record['threat_level'] = plan.get('threat_level', '') record['confidence'] = plan.get('confidence_score', 0) record['immediate_actions'] = plan.get('immediate_actions', []) except Exception: pass rows.append(record) return rows def get_risk_history(self, limit: int = 60) -> list: cur = self.conn.execute( 'SELECT frame_id,risk_score,risk_level,trend,window_avg ' 'FROM risk_events ORDER BY id DESC LIMIT ?', (limit,)) cols = [c[0] for c in cur.description] return list(reversed([dict(zip(cols, row)) for row in cur.fetchall()])) def get_reflection_summary(self) -> dict: cur = self.conn.execute( 'SELECT bias_detected, original_level, corrected_level FROM reflection_log') rows = cur.fetchall() if not rows: return {'total': 0, 'bias_events': 0, 'bias_rate_pct': 0} total = len(rows) biased = sum(1 for r in rows if r[0]) return { 'total': total, 'bias_events': biased, 'bias_rate_pct': round(biased / total * 100, 1), } def print_summary(self): for tbl in ['risk_events', 'op_decisions', 'coordinator_plans', 'reflection_log']: n = self.conn.execute(f'SELECT COUNT(*) FROM {tbl}').fetchone()[0] print(f' {tbl:<25} -> {n:4d} rows')