""" HaramGuard — FastAPI REST Server ================================== Runs the video pipeline in a background thread and exposes real-time state via a JSON REST API for the React frontend dashboard. Endpoints: GET /api/realtime/state → serialized pipeline state POST /api/actions/{id}/approve → log action approval POST /api/actions/{id}/reject → log action rejection GET /health → liveness check Run: cd backend python api.py """ import os import sys import json import time import base64 import threading import dataclasses from typing import Any import cv2 import numpy as np import uvicorn from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse # ── Make backend importable from its own directory ───────────────────── sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) import config from pipeline import RealTimePipeline # ── App setup ───────────────────────────────────────────────────────── app = FastAPI(title='HaramGuard API', version='1.0') app.add_middleware( CORSMiddleware, allow_origins=['*'], allow_methods=['*'], allow_headers=['*'], ) # ── Global state ────────────────────────────────────────────────────── pipeline: RealTimePipeline = None _action_log: dict = {} # {action_id: 'approved' | 'rejected'} # ── Frame serialization ─────────────────────────────────────────────── def _encode_frame(frame: np.ndarray) -> str: """Convert BGR ndarray to base64 JPEG string for the frontend.""" try: _, buf = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) return base64.b64encode(buf.tobytes()).decode('utf-8') except Exception: return None def _to_json_safe(obj: Any) -> Any: """Recursively convert non-JSON-serializable objects.""" if dataclasses.is_dataclass(obj) and not isinstance(obj, type): return dataclasses.asdict(obj) if isinstance(obj, np.ndarray): return None if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return float(obj) if isinstance(obj, dict): return {k: _to_json_safe(v) for k, v in obj.items()} if isinstance(obj, list): return [_to_json_safe(v) for v in obj] return obj # ── State enrichment ────────────────────────────────────────────────── def _build_agent_stats(decisions_log: list) -> dict: """Derive agent_stats from decisions log and action overrides.""" urgent = sum(1 for d in decisions_log if d.get('priority') == 'P0') pending = sum( 1 for d in decisions_log if str(d.get('frame_id', '')) not in _action_log and d.get('priority') in ('P0', 'P1') ) resolved = sum(1 for v in _action_log.values() if v == 'approved') return { 'resolved_alerts': resolved, 'pending_decisions': pending, 'urgent_interventions': urgent, } def _build_gates(risk_level: str) -> list: """Generate 6 gate statuses derived from current risk level.""" if risk_level == 'HIGH': statuses = ['open', 'partial', 'open', 'closed', 'open', 'partial'] elif risk_level == 'MEDIUM': statuses = ['open', 'open', 'partial', 'open', 'open', 'open'] else: statuses = ['open', 'open', 'open', 'open', 'open', 'open'] names = [ 'بوابة الملك عبدالعزيز', 'بوابة السلام', 'بوابة الفهد', 'بوابة عمر', 'بوابة الشبيكة', 'بوابة المروة', ] return [ {'id': str(i + 1), 'name': names[i], 'status': statuses[i]} for i in range(6) ] def _build_proposed_actions(decisions_log: list) -> list: """ Build proposed_actions from pipeline.state (latest_decision + coordinator_plan). The DB decisions_log is used only as a history source for older entries; the most recent decision is always taken from live state to avoid stale data. """ if not pipeline: return [] priority_map = {'P0': 'urgent', 'P1': 'watch', 'P2': 'completed'} result = [] seen_ids = set() # ── 1. Latest decision (from live state — always up-to-date) ───────── latest = pipeline.state.get('latest_decision') plan = pipeline.state.get('coordinator_plan') or {} if latest is not None: import dataclasses d = dataclasses.asdict(latest) if dataclasses.is_dataclass(latest) else dict(latest) action_id = str(d.get('frame_id', 'latest')) seen_ids.add(action_id) override = _action_log.get(action_id) actions = d.get('actions') or plan.get('immediate_actions') or [] gates = d.get('selected_gates') or plan.get('selected_gates') or [] just = d.get('justification') or plan.get('actions_justification') or '' priority = override or priority_map.get(d.get('priority', 'P2'), 'watch') result.append({ 'id': action_id, 'timestamp': d.get('timestamp', ''), 'priority': priority, 'title': actions[0] if actions else plan.get('executive_summary', 'مراقبة'), 'description': just, 'all_actions': actions, 'selected_gates': gates, 'threat_level': plan.get('threat_level', ''), 'arabic_alert': plan.get('arabic_alert', ''), 'confidence': plan.get('confidence_score', 0), }) # ── 2. History from decisions_log (enriched with coordinator plan via DB JOIN) ─ for d in decisions_log[:5]: action_id = str(d.get('frame_id', id(d))) if action_id in seen_ids: continue seen_ids.add(action_id) override = _action_log.get(action_id) # Prefer immediate_actions from coordinator plan (richer); fallback to actions immediate = d.get('immediate_actions') or [] if not immediate: raw_actions = d.get('actions', '[]') if isinstance(raw_actions, str): try: immediate = json.loads(raw_actions) except Exception: immediate = [] elif isinstance(raw_actions, list): immediate = raw_actions selected_gates = d.get('selected_gates') or [] justification = d.get('justification', '') base_priority = priority_map.get(d.get('priority', 'P2'), 'watch') result.append({ 'id': action_id, 'timestamp': d.get('timestamp', ''), 'priority': override or base_priority, 'title': immediate[0] if immediate else 'مراقبة', 'description': justification, 'all_actions': immediate, 'selected_gates': selected_gates, 'threat_level': d.get('threat_level', ''), 'arabic_alert': d.get('arabic_alert', ''), 'confidence': d.get('confidence', 0), }) return result[:5] def _serialize_state() -> dict: """Produce a JSON-safe snapshot of pipeline.state.""" s = dict(pipeline.state) # Annotated frame → base64 JPEG ann = s.get('annotated') s['annotated'] = _encode_frame(ann) if isinstance(ann, np.ndarray) else None # Decision dataclass → dict ld = s.get('latest_decision') if ld is not None and dataclasses.is_dataclass(ld): s['latest_decision'] = dataclasses.asdict(ld) # decisions_log is already list[dict] from DB decisions_log = _to_json_safe(s.get('decisions_log', [])) s['decisions_log'] = decisions_log # Scalar numpy types → Python floats/ints s['risk_score'] = float(s.get('risk_score', 0.0)) s['density_score'] = float(s.get('density_score', 0.0)) s['density_ema'] = float(s.get('density_ema', 0.0)) s['density_pct'] = float(s.get('density_pct', 0.0)) s['fps'] = float(s.get('fps', 0.0)) # Coordinator plan deep-copy safe s['coordinator_plan'] = _to_json_safe(s.get('coordinator_plan')) # Reflection summary s['reflection_summary'] = _to_json_safe(s.get('reflection_summary', {})) # Inject frontend-required fields s['agent_stats'] = _build_agent_stats(decisions_log) s['gates'] = _build_gates(s.get('risk_level', 'LOW')) s['proposed_actions'] = _build_proposed_actions(decisions_log) return s # ── Background pipeline thread ───────────────────────────────────────── def _pipeline_loop(): cap = pipeline.get_video_capture() # ~10 FPS: fast enough to look real-time, slow enough for LLM to keep up TARGET_FPS = 10 frame_delay = 1.0 / TARGET_FPS print(f'🎥 [API] Video loop started (target {TARGET_FPS} FPS, {frame_delay*1000:.0f}ms/frame)') while True: t0 = time.time() ret, frame = cap.read() if not ret: # Loop video — full reset so next loop plays fresh cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Reset risk agent (clip state + EMA) pipeline.risk._reset_clip(0, 'video loop restart') pipeline.risk._peak_ema = 0.0 pipeline.risk._occ_ema = 0.0 pipeline.risk._prev_count = 0 pipeline.risk._prev_density_sc = 0.0 pipeline.risk._boundary_buf = 0 pipeline.risk._in_boundary = False # Reset perception frame counter pipeline.perception.frame_id = 0 # Reset effective level tracker so first level change fires correctly pipeline._last_effective_risk_level = 'LOW' # Reset P0 rate-limit so previous loop's cooldown doesn't block from datetime import datetime as _dt pipeline.operations._boot_time = _dt.now() # Reset pipeline state (scores + decisions) pipeline.state['risk_score'] = 0.0 pipeline.state['risk_level'] = 'LOW' pipeline.state['density_pct'] = 0.0 pipeline.state['frame_id'] = 0 pipeline.state['decisions_log'] = [] pipeline.state['latest_decision'] = None pipeline.state['arabic_alert'] = '' pipeline.state['coordinator_plan'] = None # Clear frame buffer so frontend shows fresh frames pipeline._frame_buffer.clear() print('🔁 [API] Video loop restarted — full state reset') continue try: pipeline.process_one_frame(frame) except Exception as exc: print(f'[API] Frame error: {exc}') # Pace to match real video FPS elapsed = time.time() - t0 if elapsed < frame_delay: time.sleep(frame_delay - elapsed) # ── Startup ──────────────────────────────────────────────────────────── @app.on_event('startup') def startup_event(): global pipeline pipeline = RealTimePipeline( video_path = config.VIDEO_PATH, groq_api_key = config.GROQ_API_KEY, anthropic_key = None, # VisionCountAgent disabled — YOLO-only mode model_path = config.MODEL_PATH, db_path = config.DB_PATH, ) thread = threading.Thread(target=_pipeline_loop, daemon=True) thread.start() print('🚀 [API] HaramGuard REST API ready on http://0.0.0.0:8000') # ── Endpoints ────────────────────────────────────────────────────────── @app.get('/api/realtime/state') def get_state(): if pipeline is None: return JSONResponse({'error': 'pipeline initializing'}, status_code=503) return JSONResponse(_serialize_state()) @app.get('/api/frames/buffer') def get_frame_buffer(): """Return the last N annotated frames as base64 JPEGs for the frontend scrubber.""" if pipeline is None: return JSONResponse({'error': 'pipeline initializing'}, status_code=503) frames = [] for fr in list(pipeline._frame_buffer): if fr.annotated is not None: encoded = _encode_frame(fr.annotated) if encoded: frames.append({ 'frame_id': fr.frame_id, 'person_count': fr.person_count, 'track_ids': fr.track_ids, 'annotated': encoded, }) return JSONResponse({'frames': frames, 'count': len(frames)}) @app.post('/api/actions/{action_id}/approve') def approve_action(action_id: str): _action_log[action_id] = 'approved' print(f'✅ [API] Action {action_id} approved') return {'status': 'ok', 'action_id': action_id, 'result': 'approved'} @app.post('/api/actions/{action_id}/reject') def reject_action(action_id: str): _action_log[action_id] = 'rejected' print(f'❌ [API] Action {action_id} rejected') return {'status': 'ok', 'action_id': action_id, 'result': 'rejected'} @app.post('/api/reset') def reset_pipeline(): """Reset pipeline state — video restarts from frame 0, all scores zeroed.""" global _action_log if pipeline is None: return JSONResponse({'error': 'pipeline not ready'}, status_code=503) # Reset video to start cap = pipeline.get_video_capture() cap.set(cv2.CAP_PROP_POS_FRAMES, 0) # Reset risk — full clip state + EMA pipeline.risk._reset_clip(0, 'API reset') pipeline.risk._peak_ema = 0.0 pipeline.risk._occ_ema = 0.0 pipeline.risk._prev_count = 0 pipeline.risk._prev_density_sc = 0.0 pipeline.state['risk_score'] = 0.0 pipeline.state['risk_level'] = 'LOW' pipeline.state['density_pct'] = 0.0 pipeline.state['frame_id'] = 0 # Reset perception agent pipeline.perception.frame_id = 0 # Clear frame buffer pipeline._frame_buffer.clear() # Clear decisions pipeline.state['decisions_log'] = [] pipeline.state['latest_decision'] = None pipeline.state['arabic_alert'] = '' pipeline.state['coordinator_plan'] = None # Clear action log _action_log = {} print('🔄 [API] Pipeline reset — all state zeroed') return {'status': 'ok'} @app.get('/health') def health(): return {'status': 'ok', 'pipeline_ready': pipeline is not None} # ── Entry point ──────────────────────────────────────────────────────── if __name__ == '__main__': uvicorn.run('api:app', host='0.0.0.0', port=8000, reload=False)