HaramGuard / backend /core /database.py
adeem6's picture
Upload 52 files
f492127 verified
raw
history blame
8.1 kB
"""
HaramGuard β€” Database Layer
============================
SQLite persistence for risk events, operational decisions, and coordinator plans.
All agents write/read through this single interface.
"""
import os
import json
import sqlite3
from datetime import datetime
from typing import Optional
from core.models import RiskResult, Decision
class HajjFlowDB:
def __init__(self, path: str = 'outputs/hajjflow_rt.db'):
os.makedirs(os.path.dirname(path), exist_ok=True)
self.conn = sqlite3.connect(path, check_same_thread=False)
self._init_tables()
print(f'πŸ—„οΈ [DB] Connected -> {path}')
def _init_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS risk_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER,
timestamp TEXT,
risk_score REAL,
risk_level TEXT,
trend TEXT,
window_avg REAL,
window_max INTEGER
);
CREATE TABLE IF NOT EXISTS op_decisions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER,
timestamp TEXT,
context TEXT,
priority TEXT,
actions TEXT,
risk_score REAL,
risk_level TEXT
);
CREATE TABLE IF NOT EXISTS coordinator_plans (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER,
timestamp TEXT,
threat_level TEXT,
summary TEXT,
arabic_alert TEXT,
confidence REAL,
raw_json TEXT
);
CREATE TABLE IF NOT EXISTS reflection_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
frame_id INTEGER,
timestamp TEXT,
original_level TEXT,
original_score REAL,
corrected_level TEXT,
corrected_score REAL,
bias_detected INTEGER,
critique TEXT,
person_count INTEGER
);
""")
self.conn.commit()
# ── Writes ───────────────────────────────────────────────────────────
def save_risk_event(self, rr: RiskResult):
self.conn.execute(
'INSERT INTO risk_events '
'(frame_id,timestamp,risk_score,risk_level,trend,window_avg,window_max) '
'VALUES (?,?,?,?,?,?,?)',
(rr.frame_id,
datetime.fromtimestamp(rr.timestamp).isoformat(),
rr.risk_score, rr.risk_level, rr.trend,
rr.window_avg, rr.window_max))
self.conn.commit()
def save_decision(self, d: Decision):
self.conn.execute(
'INSERT INTO op_decisions '
'(frame_id,timestamp,context,priority,actions,risk_score,risk_level) '
'VALUES (?,?,?,?,?,?,?)',
(d.frame_id, d.timestamp, d.context, d.priority,
json.dumps(d.actions), d.risk_score, d.risk_level))
self.conn.commit()
def save_coordinator_plan(self, frame_id: int, plan: dict):
self.conn.execute(
'INSERT INTO coordinator_plans '
'(frame_id,timestamp,threat_level,summary,arabic_alert,confidence,raw_json) '
'VALUES (?,?,?,?,?,?,?)',
(frame_id, datetime.now().isoformat(),
plan.get('threat_level', 'UNKNOWN'),
plan.get('executive_summary', ''),
plan.get('arabic_alert', ''),
plan.get('confidence_score', 0),
json.dumps(plan, ensure_ascii=False)))
self.conn.commit()
def save_reflection(self, reflection: dict):
self.conn.execute(
'INSERT INTO reflection_log '
'(frame_id,timestamp,original_level,original_score,'
'corrected_level,corrected_score,bias_detected,critique,person_count) '
'VALUES (?,?,?,?,?,?,?,?,?)',
(reflection['frame_id'],
datetime.now().isoformat(),
reflection['original_level'],
reflection['original_score'],
reflection['corrected_level'],
reflection['corrected_score'],
int(reflection['bias_detected']),
reflection['critique'],
reflection['person_count']))
self.conn.commit()
# ── Reads ────────────────────────────────────────────────────────────
def get_last_p0_time(self, context: str) -> Optional[datetime]:
cur = self.conn.execute(
'SELECT timestamp FROM op_decisions '
'WHERE context=? AND priority="P0" '
'ORDER BY timestamp DESC LIMIT 1', (context,))
row = cur.fetchone()
return datetime.fromisoformat(row[0]) if row else None
def get_recent_decisions(self, limit: int = 10) -> list:
"""
Return recent decisions enriched with coordinator plan data
(selected_gates, justification) via LEFT JOIN on frame_id.
"""
cur = self.conn.execute(
'SELECT d.frame_id, d.timestamp, d.context, d.priority, '
' d.actions, d.risk_score, d.risk_level, '
' cp.raw_json AS plan_json '
'FROM op_decisions d '
'LEFT JOIN coordinator_plans cp ON cp.frame_id = d.frame_id '
'ORDER BY d.id DESC LIMIT ?', (limit,))
rows = []
for row in cur.fetchall():
record = {
'frame_id': row[0],
'timestamp': row[1],
'context': row[2],
'priority': row[3],
'actions': row[4],
'risk_score': row[5],
'risk_level': row[6],
}
# Enrich with coordinator plan fields if available
if row[7]:
try:
plan = json.loads(row[7])
record['selected_gates'] = plan.get('selected_gates', [])
record['justification'] = plan.get('actions_justification', '')
record['arabic_alert'] = plan.get('arabic_alert', '')
record['threat_level'] = plan.get('threat_level', '')
record['confidence'] = plan.get('confidence_score', 0)
record['immediate_actions'] = plan.get('immediate_actions', [])
except Exception:
pass
rows.append(record)
return rows
def get_risk_history(self, limit: int = 60) -> list:
cur = self.conn.execute(
'SELECT frame_id,risk_score,risk_level,trend,window_avg '
'FROM risk_events ORDER BY id DESC LIMIT ?', (limit,))
cols = [c[0] for c in cur.description]
return list(reversed([dict(zip(cols, row)) for row in cur.fetchall()]))
def get_reflection_summary(self) -> dict:
cur = self.conn.execute(
'SELECT bias_detected, original_level, corrected_level FROM reflection_log')
rows = cur.fetchall()
if not rows:
return {'total': 0, 'bias_events': 0, 'bias_rate_pct': 0}
total = len(rows)
biased = sum(1 for r in rows if r[0])
return {
'total': total,
'bias_events': biased,
'bias_rate_pct': round(biased / total * 100, 1),
}
def print_summary(self):
for tbl in ['risk_events', 'op_decisions', 'coordinator_plans', 'reflection_log']:
n = self.conn.execute(f'SELECT COUNT(*) FROM {tbl}').fetchone()[0]
print(f' {tbl:<25} -> {n:4d} rows')