Spaces:
Running
Running
| """ | |
| HaramGuard β Database Layer | |
| ============================ | |
| SQLite persistence for risk events, operational decisions, and coordinator plans. | |
| All agents write/read through this single interface. | |
| """ | |
| import os | |
| import json | |
| import sqlite3 | |
| from datetime import datetime | |
| from typing import Optional | |
| from core.models import RiskResult, Decision | |
| class HajjFlowDB: | |
| def __init__(self, path: str = 'outputs/hajjflow_rt.db'): | |
| os.makedirs(os.path.dirname(path), exist_ok=True) | |
| self.conn = sqlite3.connect(path, check_same_thread=False) | |
| self._init_tables() | |
| print(f'ποΈ [DB] Connected -> {path}') | |
| def _init_tables(self): | |
| self.conn.executescript(""" | |
| CREATE TABLE IF NOT EXISTS risk_events ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| frame_id INTEGER, | |
| timestamp TEXT, | |
| risk_score REAL, | |
| risk_level TEXT, | |
| trend TEXT, | |
| window_avg REAL, | |
| window_max INTEGER | |
| ); | |
| CREATE TABLE IF NOT EXISTS op_decisions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| frame_id INTEGER, | |
| timestamp TEXT, | |
| context TEXT, | |
| priority TEXT, | |
| actions TEXT, | |
| risk_score REAL, | |
| risk_level TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS coordinator_plans ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| frame_id INTEGER, | |
| timestamp TEXT, | |
| threat_level TEXT, | |
| summary TEXT, | |
| arabic_alert TEXT, | |
| confidence REAL, | |
| raw_json TEXT | |
| ); | |
| CREATE TABLE IF NOT EXISTS reflection_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| frame_id INTEGER, | |
| timestamp TEXT, | |
| original_level TEXT, | |
| original_score REAL, | |
| corrected_level TEXT, | |
| corrected_score REAL, | |
| bias_detected INTEGER, | |
| critique TEXT, | |
| person_count INTEGER | |
| ); | |
| """) | |
| self.conn.commit() | |
| # ββ Writes βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_risk_event(self, rr: RiskResult): | |
| self.conn.execute( | |
| 'INSERT INTO risk_events ' | |
| '(frame_id,timestamp,risk_score,risk_level,trend,window_avg,window_max) ' | |
| 'VALUES (?,?,?,?,?,?,?)', | |
| (rr.frame_id, | |
| datetime.fromtimestamp(rr.timestamp).isoformat(), | |
| rr.risk_score, rr.risk_level, rr.trend, | |
| rr.window_avg, rr.window_max)) | |
| self.conn.commit() | |
| def save_decision(self, d: Decision): | |
| self.conn.execute( | |
| 'INSERT INTO op_decisions ' | |
| '(frame_id,timestamp,context,priority,actions,risk_score,risk_level) ' | |
| 'VALUES (?,?,?,?,?,?,?)', | |
| (d.frame_id, d.timestamp, d.context, d.priority, | |
| json.dumps(d.actions), d.risk_score, d.risk_level)) | |
| self.conn.commit() | |
| def save_coordinator_plan(self, frame_id: int, plan: dict): | |
| self.conn.execute( | |
| 'INSERT INTO coordinator_plans ' | |
| '(frame_id,timestamp,threat_level,summary,arabic_alert,confidence,raw_json) ' | |
| 'VALUES (?,?,?,?,?,?,?)', | |
| (frame_id, datetime.now().isoformat(), | |
| plan.get('threat_level', 'UNKNOWN'), | |
| plan.get('executive_summary', ''), | |
| plan.get('arabic_alert', ''), | |
| plan.get('confidence_score', 0), | |
| json.dumps(plan, ensure_ascii=False))) | |
| self.conn.commit() | |
| def save_reflection(self, reflection: dict): | |
| self.conn.execute( | |
| 'INSERT INTO reflection_log ' | |
| '(frame_id,timestamp,original_level,original_score,' | |
| 'corrected_level,corrected_score,bias_detected,critique,person_count) ' | |
| 'VALUES (?,?,?,?,?,?,?,?,?)', | |
| (reflection['frame_id'], | |
| datetime.now().isoformat(), | |
| reflection['original_level'], | |
| reflection['original_score'], | |
| reflection['corrected_level'], | |
| reflection['corrected_score'], | |
| int(reflection['bias_detected']), | |
| reflection['critique'], | |
| reflection['person_count'])) | |
| self.conn.commit() | |
| # ββ Reads ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_last_p0_time(self, context: str) -> Optional[datetime]: | |
| cur = self.conn.execute( | |
| 'SELECT timestamp FROM op_decisions ' | |
| 'WHERE context=? AND priority="P0" ' | |
| 'ORDER BY timestamp DESC LIMIT 1', (context,)) | |
| row = cur.fetchone() | |
| return datetime.fromisoformat(row[0]) if row else None | |
| def get_recent_decisions(self, limit: int = 10) -> list: | |
| """ | |
| Return recent decisions enriched with coordinator plan data | |
| (selected_gates, justification) via LEFT JOIN on frame_id. | |
| """ | |
| cur = self.conn.execute( | |
| 'SELECT d.frame_id, d.timestamp, d.context, d.priority, ' | |
| ' d.actions, d.risk_score, d.risk_level, ' | |
| ' cp.raw_json AS plan_json ' | |
| 'FROM op_decisions d ' | |
| 'LEFT JOIN coordinator_plans cp ON cp.frame_id = d.frame_id ' | |
| 'ORDER BY d.id DESC LIMIT ?', (limit,)) | |
| rows = [] | |
| for row in cur.fetchall(): | |
| record = { | |
| 'frame_id': row[0], | |
| 'timestamp': row[1], | |
| 'context': row[2], | |
| 'priority': row[3], | |
| 'actions': row[4], | |
| 'risk_score': row[5], | |
| 'risk_level': row[6], | |
| } | |
| # Enrich with coordinator plan fields if available | |
| if row[7]: | |
| try: | |
| plan = json.loads(row[7]) | |
| record['selected_gates'] = plan.get('selected_gates', []) | |
| record['justification'] = plan.get('actions_justification', '') | |
| record['arabic_alert'] = plan.get('arabic_alert', '') | |
| record['threat_level'] = plan.get('threat_level', '') | |
| record['confidence'] = plan.get('confidence_score', 0) | |
| record['immediate_actions'] = plan.get('immediate_actions', []) | |
| except Exception: | |
| pass | |
| rows.append(record) | |
| return rows | |
| def get_risk_history(self, limit: int = 60) -> list: | |
| cur = self.conn.execute( | |
| 'SELECT frame_id,risk_score,risk_level,trend,window_avg ' | |
| 'FROM risk_events ORDER BY id DESC LIMIT ?', (limit,)) | |
| cols = [c[0] for c in cur.description] | |
| return list(reversed([dict(zip(cols, row)) for row in cur.fetchall()])) | |
| def get_reflection_summary(self) -> dict: | |
| cur = self.conn.execute( | |
| 'SELECT bias_detected, original_level, corrected_level FROM reflection_log') | |
| rows = cur.fetchall() | |
| if not rows: | |
| return {'total': 0, 'bias_events': 0, 'bias_rate_pct': 0} | |
| total = len(rows) | |
| biased = sum(1 for r in rows if r[0]) | |
| return { | |
| 'total': total, | |
| 'bias_events': biased, | |
| 'bias_rate_pct': round(biased / total * 100, 1), | |
| } | |
| def print_summary(self): | |
| for tbl in ['risk_events', 'op_decisions', 'coordinator_plans', 'reflection_log']: | |
| n = self.conn.execute(f'SELECT COUNT(*) FROM {tbl}').fetchone()[0] | |
| print(f' {tbl:<25} -> {n:4d} rows') | |