File size: 8,095 Bytes
f492127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
"""
HaramGuard β€” Database Layer
============================
SQLite persistence for risk events, operational decisions, and coordinator plans.
All agents write/read through this single interface.
"""

import os
import json
import sqlite3
from datetime import datetime
from typing import Optional

from core.models import RiskResult, Decision


class HajjFlowDB:

    def __init__(self, path: str = 'outputs/hajjflow_rt.db'):
        os.makedirs(os.path.dirname(path), exist_ok=True)
        self.conn = sqlite3.connect(path, check_same_thread=False)
        self._init_tables()
        print(f'πŸ—„οΈ  [DB] Connected -> {path}')

    def _init_tables(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS risk_events (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                frame_id    INTEGER,
                timestamp   TEXT,
                risk_score  REAL,
                risk_level  TEXT,
                trend       TEXT,
                window_avg  REAL,
                window_max  INTEGER
            );
            CREATE TABLE IF NOT EXISTS op_decisions (
                id          INTEGER PRIMARY KEY AUTOINCREMENT,
                frame_id    INTEGER,
                timestamp   TEXT,
                context     TEXT,
                priority    TEXT,
                actions     TEXT,
                risk_score  REAL,
                risk_level  TEXT
            );
            CREATE TABLE IF NOT EXISTS coordinator_plans (
                id           INTEGER PRIMARY KEY AUTOINCREMENT,
                frame_id     INTEGER,
                timestamp    TEXT,
                threat_level TEXT,
                summary      TEXT,
                arabic_alert TEXT,
                confidence   REAL,
                raw_json     TEXT
            );
            CREATE TABLE IF NOT EXISTS reflection_log (
                id              INTEGER PRIMARY KEY AUTOINCREMENT,
                frame_id        INTEGER,
                timestamp       TEXT,
                original_level  TEXT,
                original_score  REAL,
                corrected_level TEXT,
                corrected_score REAL,
                bias_detected   INTEGER,
                critique        TEXT,
                person_count    INTEGER
            );
        """)
        self.conn.commit()

    # ── Writes ───────────────────────────────────────────────────────────

    def save_risk_event(self, rr: RiskResult):
        self.conn.execute(
            'INSERT INTO risk_events '
            '(frame_id,timestamp,risk_score,risk_level,trend,window_avg,window_max) '
            'VALUES (?,?,?,?,?,?,?)',
            (rr.frame_id,
             datetime.fromtimestamp(rr.timestamp).isoformat(),
             rr.risk_score, rr.risk_level, rr.trend,
             rr.window_avg, rr.window_max))
        self.conn.commit()

    def save_decision(self, d: Decision):
        self.conn.execute(
            'INSERT INTO op_decisions '
            '(frame_id,timestamp,context,priority,actions,risk_score,risk_level) '
            'VALUES (?,?,?,?,?,?,?)',
            (d.frame_id, d.timestamp, d.context, d.priority,
             json.dumps(d.actions), d.risk_score, d.risk_level))
        self.conn.commit()

    def save_coordinator_plan(self, frame_id: int, plan: dict):
        self.conn.execute(
            'INSERT INTO coordinator_plans '
            '(frame_id,timestamp,threat_level,summary,arabic_alert,confidence,raw_json) '
            'VALUES (?,?,?,?,?,?,?)',
            (frame_id, datetime.now().isoformat(),
             plan.get('threat_level', 'UNKNOWN'),
             plan.get('executive_summary', ''),
             plan.get('arabic_alert', ''),
             plan.get('confidence_score', 0),
             json.dumps(plan, ensure_ascii=False)))
        self.conn.commit()

    def save_reflection(self, reflection: dict):
        self.conn.execute(
            'INSERT INTO reflection_log '
            '(frame_id,timestamp,original_level,original_score,'
            'corrected_level,corrected_score,bias_detected,critique,person_count) '
            'VALUES (?,?,?,?,?,?,?,?,?)',
            (reflection['frame_id'],
             datetime.now().isoformat(),
             reflection['original_level'],
             reflection['original_score'],
             reflection['corrected_level'],
             reflection['corrected_score'],
             int(reflection['bias_detected']),
             reflection['critique'],
             reflection['person_count']))
        self.conn.commit()

    # ── Reads ────────────────────────────────────────────────────────────

    def get_last_p0_time(self, context: str) -> Optional[datetime]:
        cur = self.conn.execute(
            'SELECT timestamp FROM op_decisions '
            'WHERE context=? AND priority="P0" '
            'ORDER BY timestamp DESC LIMIT 1', (context,))
        row = cur.fetchone()
        return datetime.fromisoformat(row[0]) if row else None

    def get_recent_decisions(self, limit: int = 10) -> list:
        """
        Return recent decisions enriched with coordinator plan data
        (selected_gates, justification) via LEFT JOIN on frame_id.
        """
        cur = self.conn.execute(
            'SELECT d.frame_id, d.timestamp, d.context, d.priority, '
            '       d.actions, d.risk_score, d.risk_level, '
            '       cp.raw_json AS plan_json '
            'FROM op_decisions d '
            'LEFT JOIN coordinator_plans cp ON cp.frame_id = d.frame_id '
            'ORDER BY d.id DESC LIMIT ?', (limit,))
        rows = []
        for row in cur.fetchall():
            record = {
                'frame_id':   row[0],
                'timestamp':  row[1],
                'context':    row[2],
                'priority':   row[3],
                'actions':    row[4],
                'risk_score': row[5],
                'risk_level': row[6],
            }
            # Enrich with coordinator plan fields if available
            if row[7]:
                try:
                    plan = json.loads(row[7])
                    record['selected_gates']       = plan.get('selected_gates', [])
                    record['justification']        = plan.get('actions_justification', '')
                    record['arabic_alert']         = plan.get('arabic_alert', '')
                    record['threat_level']         = plan.get('threat_level', '')
                    record['confidence']           = plan.get('confidence_score', 0)
                    record['immediate_actions']    = plan.get('immediate_actions', [])
                except Exception:
                    pass
            rows.append(record)
        return rows

    def get_risk_history(self, limit: int = 60) -> list:
        cur = self.conn.execute(
            'SELECT frame_id,risk_score,risk_level,trend,window_avg '
            'FROM risk_events ORDER BY id DESC LIMIT ?', (limit,))
        cols = [c[0] for c in cur.description]
        return list(reversed([dict(zip(cols, row)) for row in cur.fetchall()]))

    def get_reflection_summary(self) -> dict:
        cur = self.conn.execute(
            'SELECT bias_detected, original_level, corrected_level FROM reflection_log')
        rows = cur.fetchall()
        if not rows:
            return {'total': 0, 'bias_events': 0, 'bias_rate_pct': 0}
        total  = len(rows)
        biased = sum(1 for r in rows if r[0])
        return {
            'total':          total,
            'bias_events':    biased,
            'bias_rate_pct':  round(biased / total * 100, 1),
        }

    def print_summary(self):
        for tbl in ['risk_events', 'op_decisions', 'coordinator_plans', 'reflection_log']:
            n = self.conn.execute(f'SELECT COUNT(*) FROM {tbl}').fetchone()[0]
            print(f'  {tbl:<25} -> {n:4d} rows')