File size: 15,471 Bytes
f492127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e89e5aa
 
 
 
f492127
e89e5aa
f492127
 
e89e5aa
f492127
e89e5aa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f492127
 
 
 
 
e89e5aa
 
 
 
f492127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e89e5aa
 
 
 
 
 
 
 
 
 
f492127
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
"""
HaramGuard β€” FastAPI REST Server
==================================
Runs the video pipeline in a background thread and exposes real-time state
via a JSON REST API for the React frontend dashboard.

Endpoints:
  GET  /api/realtime/state          β†’ serialized pipeline state
  POST /api/actions/{id}/approve    β†’ log action approval
  POST /api/actions/{id}/reject     β†’ log action rejection
  GET  /health                      β†’ liveness check

Run:
  cd backend
  python api.py
"""

import os
import sys
import json
import time
import base64
import threading
import dataclasses
from typing import Any

import cv2
import numpy as np
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse

# ── Make backend importable from its own directory ─────────────────────
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

import config
from pipeline import RealTimePipeline

# ── App setup ─────────────────────────────────────────────────────────
app = FastAPI(title='HaramGuard API', version='1.0')
app.add_middleware(
    CORSMiddleware,
    allow_origins=['*'],
    allow_methods=['*'],
    allow_headers=['*'],
)

# ── Global state ──────────────────────────────────────────────────────
pipeline: RealTimePipeline = None
_action_log: dict = {}      # {action_id: 'approved' | 'rejected'}

# ── Frame serialization ───────────────────────────────────────────────

def _encode_frame(frame: np.ndarray) -> str:
    """Convert BGR ndarray to base64 JPEG string for the frontend."""
    try:
        _, buf = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
        return base64.b64encode(buf.tobytes()).decode('utf-8')
    except Exception:
        return None


def _to_json_safe(obj: Any) -> Any:
    """Recursively convert non-JSON-serializable objects."""
    if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
        return dataclasses.asdict(obj)
    if isinstance(obj, np.ndarray):
        return None
    if isinstance(obj, (np.integer,)):
        return int(obj)
    if isinstance(obj, (np.floating,)):
        return float(obj)
    if isinstance(obj, dict):
        return {k: _to_json_safe(v) for k, v in obj.items()}
    if isinstance(obj, list):
        return [_to_json_safe(v) for v in obj]
    return obj


# ── State enrichment ──────────────────────────────────────────────────

def _build_agent_stats(decisions_log: list) -> dict:
    """Derive agent_stats from decisions log and action overrides."""
    urgent   = sum(1 for d in decisions_log if d.get('priority') == 'P0')
    pending  = sum(
        1 for d in decisions_log
        if str(d.get('frame_id', '')) not in _action_log
        and d.get('priority') in ('P0', 'P1')
    )
    resolved = sum(1 for v in _action_log.values() if v == 'approved')
    return {
        'resolved_alerts':      resolved,
        'pending_decisions':    pending,
        'urgent_interventions': urgent,
    }


def _build_gates(risk_level: str) -> list:
    """Generate 6 gate statuses derived from current risk level."""
    if risk_level == 'HIGH':
        statuses = ['open', 'partial', 'open', 'closed', 'open', 'partial']
    elif risk_level == 'MEDIUM':
        statuses = ['open', 'open', 'partial', 'open', 'open', 'open']
    else:
        statuses = ['open', 'open', 'open', 'open', 'open', 'open']
    names = [
        'بوابة Ψ§Ω„Ω…Ω„Ωƒ ΨΉΨ¨Ψ―Ψ§Ω„ΨΉΨ²ΩŠΨ²', 'بوابة Ψ§Ω„Ψ³Ω„Ψ§Ω…', 'بوابة الفهد',
        'بوابة ΨΉΩ…Ψ±', 'بوابة Ψ§Ω„Ψ΄Ψ¨ΩŠΩƒΨ©', 'بوابة Ψ§Ω„Ω…Ψ±ΩˆΨ©',
    ]
    return [
        {'id': str(i + 1), 'name': names[i], 'status': statuses[i]}
        for i in range(6)
    ]


def _build_proposed_actions(decisions_log: list) -> list:
    """
    Build proposed_actions from pipeline.state (latest_decision + coordinator_plan).
    The DB decisions_log is used only as a history source for older entries;
    the most recent decision is always taken from live state to avoid stale data.
    """
    if not pipeline:
        return []

    priority_map = {'P0': 'urgent', 'P1': 'watch', 'P2': 'completed'}
    result       = []
    seen_ids     = set()

    # ── 1. Latest decision (from live state β€” always up-to-date) ─────────
    latest   = pipeline.state.get('latest_decision')
    plan     = pipeline.state.get('coordinator_plan') or {}

    if latest is not None:
        import dataclasses
        d = dataclasses.asdict(latest) if dataclasses.is_dataclass(latest) else dict(latest)

        action_id  = str(d.get('frame_id', 'latest'))
        seen_ids.add(action_id)
        override   = _action_log.get(action_id)
        actions    = d.get('actions') or plan.get('immediate_actions') or []
        gates      = d.get('selected_gates') or plan.get('selected_gates') or []
        just       = d.get('justification') or plan.get('actions_justification') or ''
        priority   = override or priority_map.get(d.get('priority', 'P2'), 'watch')

        result.append({
            'id':             action_id,
            'timestamp':      d.get('timestamp', ''),
            'priority':       priority,
            'title':          actions[0] if actions else plan.get('executive_summary', 'Ω…Ψ±Ψ§Ω‚Ψ¨Ψ©'),
            'description':    just,
            'all_actions':    actions,
            'selected_gates': gates,
            'threat_level':   plan.get('threat_level', ''),
            'arabic_alert':   plan.get('arabic_alert', ''),
            'confidence':     plan.get('confidence_score', 0),
        })

    # ── 2. History from decisions_log (enriched with coordinator plan via DB JOIN) ─
    for d in decisions_log[:5]:
        action_id = str(d.get('frame_id', id(d)))
        if action_id in seen_ids:
            continue
        seen_ids.add(action_id)

        override = _action_log.get(action_id)

        # Prefer immediate_actions from coordinator plan (richer); fallback to actions
        immediate = d.get('immediate_actions') or []
        if not immediate:
            raw_actions = d.get('actions', '[]')
            if isinstance(raw_actions, str):
                try:
                    immediate = json.loads(raw_actions)
                except Exception:
                    immediate = []
            elif isinstance(raw_actions, list):
                immediate = raw_actions

        selected_gates = d.get('selected_gates') or []
        justification  = d.get('justification', '')
        base_priority  = priority_map.get(d.get('priority', 'P2'), 'watch')

        result.append({
            'id':             action_id,
            'timestamp':      d.get('timestamp', ''),
            'priority':       override or base_priority,
            'title':          immediate[0] if immediate else 'Ω…Ψ±Ψ§Ω‚Ψ¨Ψ©',
            'description':    justification,
            'all_actions':    immediate,
            'selected_gates': selected_gates,
            'threat_level':   d.get('threat_level', ''),
            'arabic_alert':   d.get('arabic_alert', ''),
            'confidence':     d.get('confidence', 0),
        })

    return result[:5]


def _serialize_state() -> dict:
    """Produce a JSON-safe snapshot of pipeline.state."""
    s = dict(pipeline.state)

    # Annotated frame β†’ base64 JPEG
    ann = s.get('annotated')
    s['annotated'] = _encode_frame(ann) if isinstance(ann, np.ndarray) else None

    # Decision dataclass β†’ dict
    ld = s.get('latest_decision')
    if ld is not None and dataclasses.is_dataclass(ld):
        s['latest_decision'] = dataclasses.asdict(ld)

    # decisions_log is already list[dict] from DB
    decisions_log = _to_json_safe(s.get('decisions_log', []))
    s['decisions_log'] = decisions_log

    # Scalar numpy types β†’ Python floats/ints
    s['risk_score']    = float(s.get('risk_score', 0.0))
    s['density_score'] = float(s.get('density_score', 0.0))
    s['density_ema']   = float(s.get('density_ema', 0.0))
    s['density_pct']   = float(s.get('density_pct', 0.0))
    s['fps']           = float(s.get('fps', 0.0))

    # Coordinator plan deep-copy safe
    s['coordinator_plan'] = _to_json_safe(s.get('coordinator_plan'))

    # Reflection summary
    s['reflection_summary'] = _to_json_safe(s.get('reflection_summary', {}))

    # Inject frontend-required fields
    s['agent_stats']      = _build_agent_stats(decisions_log)
    s['gates']            = _build_gates(s.get('risk_level', 'LOW'))
    s['proposed_actions'] = _build_proposed_actions(decisions_log)

    return s


# ── Background pipeline thread ─────────────────────────────────────────

def _pipeline_loop():
    cap = pipeline.get_video_capture()
    # ~10 FPS: fast enough to look real-time, slow enough for LLM to keep up
    TARGET_FPS  = 10
    frame_delay = 1.0 / TARGET_FPS
    print(f'πŸŽ₯ [API] Video loop started (target {TARGET_FPS} FPS, {frame_delay*1000:.0f}ms/frame)')
    while True:
        t0 = time.time()
        ret, frame = cap.read()
        if not ret:
            # Loop video β€” full reset so next loop plays fresh
            cap.set(cv2.CAP_PROP_POS_FRAMES, 0)

            # Reset risk agent (clip state + EMA)
            pipeline.risk._reset_clip(0, 'video loop restart')
            pipeline.risk._peak_ema        = 0.0
            pipeline.risk._occ_ema         = 0.0
            pipeline.risk._prev_count      = 0
            pipeline.risk._prev_density_sc = 0.0
            pipeline.risk._boundary_buf    = 0
            pipeline.risk._in_boundary     = False

            # Reset perception frame counter
            pipeline.perception.frame_id = 0

            # Reset effective level tracker so first level change fires correctly
            pipeline._last_effective_risk_level = 'LOW'

            # Reset P0 rate-limit so previous loop's cooldown doesn't block
            from datetime import datetime as _dt
            pipeline.operations._boot_time = _dt.now()

            # Reset pipeline state (scores + decisions)
            pipeline.state['risk_score']       = 0.0
            pipeline.state['risk_level']       = 'LOW'
            pipeline.state['density_pct']      = 0.0
            pipeline.state['frame_id']         = 0
            pipeline.state['decisions_log']    = []
            pipeline.state['latest_decision']  = None
            pipeline.state['arabic_alert']     = ''
            pipeline.state['coordinator_plan'] = None

            # Clear frame buffer so frontend shows fresh frames
            pipeline._frame_buffer.clear()

            print('πŸ” [API] Video loop restarted β€” full state reset')
            continue
        try:
            pipeline.process_one_frame(frame)
        except Exception as exc:
            print(f'[API] Frame error: {exc}')
        # Pace to match real video FPS
        elapsed = time.time() - t0
        if elapsed < frame_delay:
            time.sleep(frame_delay - elapsed)


# ── Startup ────────────────────────────────────────────────────────────

@app.on_event('startup')
def startup_event():
    global pipeline
    pipeline = RealTimePipeline(
        video_path    = config.VIDEO_PATH,
        groq_api_key  = config.GROQ_API_KEY,
        anthropic_key = None,   # VisionCountAgent disabled β€” YOLO-only mode
        model_path    = config.MODEL_PATH,
        db_path       = config.DB_PATH,
    )
    thread = threading.Thread(target=_pipeline_loop, daemon=True)
    thread.start()
    print('πŸš€ [API] HaramGuard REST API ready on http://0.0.0.0:8000')


# ── Endpoints ──────────────────────────────────────────────────────────

@app.get('/api/realtime/state')
def get_state():
    if pipeline is None:
        return JSONResponse({'error': 'pipeline initializing'}, status_code=503)
    return JSONResponse(_serialize_state())


@app.get('/api/frames/buffer')
def get_frame_buffer():
    """Return the last N annotated frames as base64 JPEGs for the frontend scrubber."""
    if pipeline is None:
        return JSONResponse({'error': 'pipeline initializing'}, status_code=503)
    frames = []
    for fr in list(pipeline._frame_buffer):
        if fr.annotated is not None:
            encoded = _encode_frame(fr.annotated)
            if encoded:
                frames.append({
                    'frame_id':     fr.frame_id,
                    'person_count': fr.person_count,
                    'track_ids':    fr.track_ids,
                    'annotated':    encoded,
                })
    return JSONResponse({'frames': frames, 'count': len(frames)})


@app.post('/api/actions/{action_id}/approve')
def approve_action(action_id: str):
    _action_log[action_id] = 'approved'
    print(f'βœ… [API] Action {action_id} approved')
    return {'status': 'ok', 'action_id': action_id, 'result': 'approved'}


@app.post('/api/actions/{action_id}/reject')
def reject_action(action_id: str):
    _action_log[action_id] = 'rejected'
    print(f'❌ [API] Action {action_id} rejected')
    return {'status': 'ok', 'action_id': action_id, 'result': 'rejected'}


@app.post('/api/reset')
def reset_pipeline():
    """Reset pipeline state β€” video restarts from frame 0, all scores zeroed."""
    global _action_log
    if pipeline is None:
        return JSONResponse({'error': 'pipeline not ready'}, status_code=503)
    # Reset video to start
    cap = pipeline.get_video_capture()
    cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
    # Reset risk β€” full clip state + EMA
    pipeline.risk._reset_clip(0, 'API reset')
    pipeline.risk._peak_ema        = 0.0
    pipeline.risk._occ_ema         = 0.0
    pipeline.risk._prev_count      = 0
    pipeline.risk._prev_density_sc = 0.0
    pipeline.state['risk_score']   = 0.0
    pipeline.state['risk_level']   = 'LOW'
    pipeline.state['density_pct']  = 0.0
    pipeline.state['frame_id']     = 0
    # Reset perception agent
    pipeline.perception.frame_id = 0
    # Clear frame buffer
    pipeline._frame_buffer.clear()
    # Clear decisions
    pipeline.state['decisions_log']   = []
    pipeline.state['latest_decision'] = None
    pipeline.state['arabic_alert']    = ''
    pipeline.state['coordinator_plan'] = None
    # Clear action log
    _action_log = {}
    print('πŸ”„ [API] Pipeline reset β€” all state zeroed')
    return {'status': 'ok'}


@app.get('/health')
def health():
    return {'status': 'ok', 'pipeline_ready': pipeline is not None}


# ── Entry point ────────────────────────────────────────────────────────

if __name__ == '__main__':
    uvicorn.run('api:app', host='0.0.0.0', port=8000, reload=False)