Spaces:
Running
Running
| """ | |
| HaramGuard β FastAPI REST Server | |
| ================================== | |
| Runs the video pipeline in a background thread and exposes real-time state | |
| via a JSON REST API for the React frontend dashboard. | |
| Endpoints: | |
| GET /api/realtime/state β serialized pipeline state | |
| POST /api/actions/{id}/approve β log action approval | |
| POST /api/actions/{id}/reject β log action rejection | |
| GET /health β liveness check | |
| Run: | |
| cd backend | |
| python api.py | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import time | |
| import base64 | |
| import threading | |
| import dataclasses | |
| from typing import Any | |
| import cv2 | |
| import numpy as np | |
| import uvicorn | |
| from fastapi import FastAPI | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.responses import JSONResponse | |
| # ββ Make backend importable from its own directory βββββββββββββββββββββ | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| import config | |
| from pipeline import RealTimePipeline | |
| # ββ App setup βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| app = FastAPI(title='HaramGuard API', version='1.0') | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=['*'], | |
| allow_methods=['*'], | |
| allow_headers=['*'], | |
| ) | |
| # ββ Global state ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| pipeline: RealTimePipeline = None | |
| _action_log: dict = {} # {action_id: 'approved' | 'rejected'} | |
| # ββ Frame serialization βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _encode_frame(frame: np.ndarray) -> str: | |
| """Convert BGR ndarray to base64 JPEG string for the frontend.""" | |
| try: | |
| _, buf = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70]) | |
| return base64.b64encode(buf.tobytes()).decode('utf-8') | |
| except Exception: | |
| return None | |
| def _to_json_safe(obj: Any) -> Any: | |
| """Recursively convert non-JSON-serializable objects.""" | |
| if dataclasses.is_dataclass(obj) and not isinstance(obj, type): | |
| return dataclasses.asdict(obj) | |
| if isinstance(obj, np.ndarray): | |
| return None | |
| if isinstance(obj, (np.integer,)): | |
| return int(obj) | |
| if isinstance(obj, (np.floating,)): | |
| return float(obj) | |
| if isinstance(obj, dict): | |
| return {k: _to_json_safe(v) for k, v in obj.items()} | |
| if isinstance(obj, list): | |
| return [_to_json_safe(v) for v in obj] | |
| return obj | |
| # ββ State enrichment ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _build_agent_stats(decisions_log: list) -> dict: | |
| """Derive agent_stats from decisions log and action overrides.""" | |
| urgent = sum(1 for d in decisions_log if d.get('priority') == 'P0') | |
| pending = sum( | |
| 1 for d in decisions_log | |
| if str(d.get('frame_id', '')) not in _action_log | |
| and d.get('priority') in ('P0', 'P1') | |
| ) | |
| resolved = sum(1 for v in _action_log.values() if v == 'approved') | |
| return { | |
| 'resolved_alerts': resolved, | |
| 'pending_decisions': pending, | |
| 'urgent_interventions': urgent, | |
| } | |
| def _build_gates(risk_level: str) -> list: | |
| """Generate 6 gate statuses derived from current risk level.""" | |
| if risk_level == 'HIGH': | |
| statuses = ['open', 'partial', 'open', 'closed', 'open', 'partial'] | |
| elif risk_level == 'MEDIUM': | |
| statuses = ['open', 'open', 'partial', 'open', 'open', 'open'] | |
| else: | |
| statuses = ['open', 'open', 'open', 'open', 'open', 'open'] | |
| names = [ | |
| 'Ψ¨ΩΨ§Ψ¨Ψ© Ψ§ΩΩ ΩΩ ΨΉΨ¨Ψ―Ψ§ΩΨΉΨ²ΩΨ²', 'Ψ¨ΩΨ§Ψ¨Ψ© Ψ§ΩΨ³ΩΨ§Ω ', 'Ψ¨ΩΨ§Ψ¨Ψ© Ψ§ΩΩΩΨ―', | |
| 'Ψ¨ΩΨ§Ψ¨Ψ© ΨΉΩ Ψ±', 'Ψ¨ΩΨ§Ψ¨Ψ© Ψ§ΩΨ΄Ψ¨ΩΩΨ©', 'Ψ¨ΩΨ§Ψ¨Ψ© Ψ§ΩΩ Ψ±ΩΨ©', | |
| ] | |
| return [ | |
| {'id': str(i + 1), 'name': names[i], 'status': statuses[i]} | |
| for i in range(6) | |
| ] | |
| def _build_proposed_actions(decisions_log: list) -> list: | |
| """ | |
| Build proposed_actions from pipeline.state (latest_decision + coordinator_plan). | |
| The DB decisions_log is used only as a history source for older entries; | |
| the most recent decision is always taken from live state to avoid stale data. | |
| """ | |
| if not pipeline: | |
| return [] | |
| priority_map = {'P0': 'urgent', 'P1': 'watch', 'P2': 'completed'} | |
| result = [] | |
| seen_ids = set() | |
| # ββ 1. Latest decision (from live state β always up-to-date) βββββββββ | |
| latest = pipeline.state.get('latest_decision') | |
| plan = pipeline.state.get('coordinator_plan') or {} | |
| if latest is not None: | |
| import dataclasses | |
| d = dataclasses.asdict(latest) if dataclasses.is_dataclass(latest) else dict(latest) | |
| action_id = str(d.get('frame_id', 'latest')) | |
| seen_ids.add(action_id) | |
| override = _action_log.get(action_id) | |
| actions = d.get('actions') or plan.get('immediate_actions') or [] | |
| gates = d.get('selected_gates') or plan.get('selected_gates') or [] | |
| just = d.get('justification') or plan.get('actions_justification') or '' | |
| priority = override or priority_map.get(d.get('priority', 'P2'), 'watch') | |
| result.append({ | |
| 'id': action_id, | |
| 'timestamp': d.get('timestamp', ''), | |
| 'priority': priority, | |
| 'title': actions[0] if actions else plan.get('executive_summary', 'Ω Ψ±Ψ§ΩΨ¨Ψ©'), | |
| 'description': just, | |
| 'all_actions': actions, | |
| 'selected_gates': gates, | |
| 'threat_level': plan.get('threat_level', ''), | |
| 'arabic_alert': plan.get('arabic_alert', ''), | |
| 'confidence': plan.get('confidence_score', 0), | |
| }) | |
| # ββ 2. History from decisions_log (enriched with coordinator plan via DB JOIN) β | |
| for d in decisions_log[:5]: | |
| action_id = str(d.get('frame_id', id(d))) | |
| if action_id in seen_ids: | |
| continue | |
| seen_ids.add(action_id) | |
| override = _action_log.get(action_id) | |
| # Prefer immediate_actions from coordinator plan (richer); fallback to actions | |
| immediate = d.get('immediate_actions') or [] | |
| if not immediate: | |
| raw_actions = d.get('actions', '[]') | |
| if isinstance(raw_actions, str): | |
| try: | |
| immediate = json.loads(raw_actions) | |
| except Exception: | |
| immediate = [] | |
| elif isinstance(raw_actions, list): | |
| immediate = raw_actions | |
| selected_gates = d.get('selected_gates') or [] | |
| justification = d.get('justification', '') | |
| base_priority = priority_map.get(d.get('priority', 'P2'), 'watch') | |
| result.append({ | |
| 'id': action_id, | |
| 'timestamp': d.get('timestamp', ''), | |
| 'priority': override or base_priority, | |
| 'title': immediate[0] if immediate else 'Ω Ψ±Ψ§ΩΨ¨Ψ©', | |
| 'description': justification, | |
| 'all_actions': immediate, | |
| 'selected_gates': selected_gates, | |
| 'threat_level': d.get('threat_level', ''), | |
| 'arabic_alert': d.get('arabic_alert', ''), | |
| 'confidence': d.get('confidence', 0), | |
| }) | |
| return result[:5] | |
| def _serialize_state() -> dict: | |
| """Produce a JSON-safe snapshot of pipeline.state.""" | |
| s = dict(pipeline.state) | |
| # Annotated frame β base64 JPEG | |
| ann = s.get('annotated') | |
| s['annotated'] = _encode_frame(ann) if isinstance(ann, np.ndarray) else None | |
| # Decision dataclass β dict | |
| ld = s.get('latest_decision') | |
| if ld is not None and dataclasses.is_dataclass(ld): | |
| s['latest_decision'] = dataclasses.asdict(ld) | |
| # decisions_log is already list[dict] from DB | |
| decisions_log = _to_json_safe(s.get('decisions_log', [])) | |
| s['decisions_log'] = decisions_log | |
| # Scalar numpy types β Python floats/ints | |
| s['risk_score'] = float(s.get('risk_score', 0.0)) | |
| s['density_score'] = float(s.get('density_score', 0.0)) | |
| s['density_ema'] = float(s.get('density_ema', 0.0)) | |
| s['density_pct'] = float(s.get('density_pct', 0.0)) | |
| s['fps'] = float(s.get('fps', 0.0)) | |
| # Coordinator plan deep-copy safe | |
| s['coordinator_plan'] = _to_json_safe(s.get('coordinator_plan')) | |
| # Reflection summary | |
| s['reflection_summary'] = _to_json_safe(s.get('reflection_summary', {})) | |
| # Inject frontend-required fields | |
| s['agent_stats'] = _build_agent_stats(decisions_log) | |
| s['gates'] = _build_gates(s.get('risk_level', 'LOW')) | |
| s['proposed_actions'] = _build_proposed_actions(decisions_log) | |
| return s | |
| # ββ Background pipeline thread βββββββββββββββββββββββββββββββββββββββββ | |
| def _pipeline_loop(): | |
| cap = pipeline.get_video_capture() | |
| # ~10 FPS: fast enough to look real-time, slow enough for LLM to keep up | |
| TARGET_FPS = 10 | |
| frame_delay = 1.0 / TARGET_FPS | |
| print(f'π₯ [API] Video loop started (target {TARGET_FPS} FPS, {frame_delay*1000:.0f}ms/frame)') | |
| while True: | |
| t0 = time.time() | |
| ret, frame = cap.read() | |
| if not ret: | |
| # Loop video β full reset so next loop plays fresh | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| # Reset risk agent (clip state + EMA) | |
| pipeline.risk._reset_clip(0, 'video loop restart') | |
| pipeline.risk._peak_ema = 0.0 | |
| pipeline.risk._occ_ema = 0.0 | |
| pipeline.risk._prev_count = 0 | |
| pipeline.risk._prev_density_sc = 0.0 | |
| pipeline.risk._boundary_buf = 0 | |
| pipeline.risk._in_boundary = False | |
| # Reset perception frame counter | |
| pipeline.perception.frame_id = 0 | |
| # Reset effective level tracker so first level change fires correctly | |
| pipeline._last_effective_risk_level = 'LOW' | |
| # Reset P0 rate-limit so previous loop's cooldown doesn't block | |
| from datetime import datetime as _dt | |
| pipeline.operations._boot_time = _dt.now() | |
| # Reset pipeline state (scores + decisions) | |
| pipeline.state['risk_score'] = 0.0 | |
| pipeline.state['risk_level'] = 'LOW' | |
| pipeline.state['density_pct'] = 0.0 | |
| pipeline.state['frame_id'] = 0 | |
| pipeline.state['decisions_log'] = [] | |
| pipeline.state['latest_decision'] = None | |
| pipeline.state['arabic_alert'] = '' | |
| pipeline.state['coordinator_plan'] = None | |
| # Clear frame buffer so frontend shows fresh frames | |
| pipeline._frame_buffer.clear() | |
| print('π [API] Video loop restarted β full state reset') | |
| continue | |
| try: | |
| pipeline.process_one_frame(frame) | |
| except Exception as exc: | |
| print(f'[API] Frame error: {exc}') | |
| # Pace to match real video FPS | |
| elapsed = time.time() - t0 | |
| if elapsed < frame_delay: | |
| time.sleep(frame_delay - elapsed) | |
| # ββ Startup ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def startup_event(): | |
| global pipeline | |
| pipeline = RealTimePipeline( | |
| video_path = config.VIDEO_PATH, | |
| groq_api_key = config.GROQ_API_KEY, | |
| anthropic_key = None, # VisionCountAgent disabled β YOLO-only mode | |
| model_path = config.MODEL_PATH, | |
| db_path = config.DB_PATH, | |
| ) | |
| thread = threading.Thread(target=_pipeline_loop, daemon=True) | |
| thread.start() | |
| print('π [API] HaramGuard REST API ready on http://0.0.0.0:8000') | |
| # ββ Endpoints ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_state(): | |
| if pipeline is None: | |
| return JSONResponse({'error': 'pipeline initializing'}, status_code=503) | |
| return JSONResponse(_serialize_state()) | |
| def get_frame_buffer(): | |
| """Return the last N annotated frames as base64 JPEGs for the frontend scrubber.""" | |
| if pipeline is None: | |
| return JSONResponse({'error': 'pipeline initializing'}, status_code=503) | |
| frames = [] | |
| for fr in list(pipeline._frame_buffer): | |
| if fr.annotated is not None: | |
| encoded = _encode_frame(fr.annotated) | |
| if encoded: | |
| frames.append({ | |
| 'frame_id': fr.frame_id, | |
| 'person_count': fr.person_count, | |
| 'track_ids': fr.track_ids, | |
| 'annotated': encoded, | |
| }) | |
| return JSONResponse({'frames': frames, 'count': len(frames)}) | |
| def approve_action(action_id: str): | |
| _action_log[action_id] = 'approved' | |
| print(f'β [API] Action {action_id} approved') | |
| return {'status': 'ok', 'action_id': action_id, 'result': 'approved'} | |
| def reject_action(action_id: str): | |
| _action_log[action_id] = 'rejected' | |
| print(f'β [API] Action {action_id} rejected') | |
| return {'status': 'ok', 'action_id': action_id, 'result': 'rejected'} | |
| def reset_pipeline(): | |
| """Reset pipeline state β video restarts from frame 0, all scores zeroed.""" | |
| global _action_log | |
| if pipeline is None: | |
| return JSONResponse({'error': 'pipeline not ready'}, status_code=503) | |
| # Reset video to start | |
| cap = pipeline.get_video_capture() | |
| cap.set(cv2.CAP_PROP_POS_FRAMES, 0) | |
| # Reset risk β full clip state + EMA | |
| pipeline.risk._reset_clip(0, 'API reset') | |
| pipeline.risk._peak_ema = 0.0 | |
| pipeline.risk._occ_ema = 0.0 | |
| pipeline.risk._prev_count = 0 | |
| pipeline.risk._prev_density_sc = 0.0 | |
| pipeline.state['risk_score'] = 0.0 | |
| pipeline.state['risk_level'] = 'LOW' | |
| pipeline.state['density_pct'] = 0.0 | |
| pipeline.state['frame_id'] = 0 | |
| # Reset perception agent | |
| pipeline.perception.frame_id = 0 | |
| # Clear frame buffer | |
| pipeline._frame_buffer.clear() | |
| # Clear decisions | |
| pipeline.state['decisions_log'] = [] | |
| pipeline.state['latest_decision'] = None | |
| pipeline.state['arabic_alert'] = '' | |
| pipeline.state['coordinator_plan'] = None | |
| # Clear action log | |
| _action_log = {} | |
| print('π [API] Pipeline reset β all state zeroed') | |
| return {'status': 'ok'} | |
| def health(): | |
| return {'status': 'ok', 'pipeline_ready': pipeline is not None} | |
| # ββ Entry point ββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| if __name__ == '__main__': | |
| uvicorn.run('api:app', host='0.0.0.0', port=8000, reload=False) | |