adeem6's picture
Update backend/api.py (#8)
e89e5aa
"""
HaramGuard β€” FastAPI REST Server
==================================
Runs the video pipeline in a background thread and exposes real-time state
via a JSON REST API for the React frontend dashboard.
Endpoints:
GET /api/realtime/state β†’ serialized pipeline state
POST /api/actions/{id}/approve β†’ log action approval
POST /api/actions/{id}/reject β†’ log action rejection
GET /health β†’ liveness check
Run:
cd backend
python api.py
"""
import os
import sys
import json
import time
import base64
import threading
import dataclasses
from typing import Any
import cv2
import numpy as np
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
# ── Make backend importable from its own directory ─────────────────────
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
import config
from pipeline import RealTimePipeline
# ── App setup ─────────────────────────────────────────────────────────
app = FastAPI(title='HaramGuard API', version='1.0')
app.add_middleware(
CORSMiddleware,
allow_origins=['*'],
allow_methods=['*'],
allow_headers=['*'],
)
# ── Global state ──────────────────────────────────────────────────────
pipeline: RealTimePipeline = None
_action_log: dict = {} # {action_id: 'approved' | 'rejected'}
# ── Frame serialization ───────────────────────────────────────────────
def _encode_frame(frame: np.ndarray) -> str:
"""Convert BGR ndarray to base64 JPEG string for the frontend."""
try:
_, buf = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 70])
return base64.b64encode(buf.tobytes()).decode('utf-8')
except Exception:
return None
def _to_json_safe(obj: Any) -> Any:
"""Recursively convert non-JSON-serializable objects."""
if dataclasses.is_dataclass(obj) and not isinstance(obj, type):
return dataclasses.asdict(obj)
if isinstance(obj, np.ndarray):
return None
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return float(obj)
if isinstance(obj, dict):
return {k: _to_json_safe(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_to_json_safe(v) for v in obj]
return obj
# ── State enrichment ──────────────────────────────────────────────────
def _build_agent_stats(decisions_log: list) -> dict:
"""Derive agent_stats from decisions log and action overrides."""
urgent = sum(1 for d in decisions_log if d.get('priority') == 'P0')
pending = sum(
1 for d in decisions_log
if str(d.get('frame_id', '')) not in _action_log
and d.get('priority') in ('P0', 'P1')
)
resolved = sum(1 for v in _action_log.values() if v == 'approved')
return {
'resolved_alerts': resolved,
'pending_decisions': pending,
'urgent_interventions': urgent,
}
def _build_gates(risk_level: str) -> list:
"""Generate 6 gate statuses derived from current risk level."""
if risk_level == 'HIGH':
statuses = ['open', 'partial', 'open', 'closed', 'open', 'partial']
elif risk_level == 'MEDIUM':
statuses = ['open', 'open', 'partial', 'open', 'open', 'open']
else:
statuses = ['open', 'open', 'open', 'open', 'open', 'open']
names = [
'بوابة Ψ§Ω„Ω…Ω„Ωƒ ΨΉΨ¨Ψ―Ψ§Ω„ΨΉΨ²ΩŠΨ²', 'بوابة Ψ§Ω„Ψ³Ω„Ψ§Ω…', 'بوابة الفهد',
'بوابة ΨΉΩ…Ψ±', 'بوابة Ψ§Ω„Ψ΄Ψ¨ΩŠΩƒΨ©', 'بوابة Ψ§Ω„Ω…Ψ±ΩˆΨ©',
]
return [
{'id': str(i + 1), 'name': names[i], 'status': statuses[i]}
for i in range(6)
]
def _build_proposed_actions(decisions_log: list) -> list:
"""
Build proposed_actions from pipeline.state (latest_decision + coordinator_plan).
The DB decisions_log is used only as a history source for older entries;
the most recent decision is always taken from live state to avoid stale data.
"""
if not pipeline:
return []
priority_map = {'P0': 'urgent', 'P1': 'watch', 'P2': 'completed'}
result = []
seen_ids = set()
# ── 1. Latest decision (from live state β€” always up-to-date) ─────────
latest = pipeline.state.get('latest_decision')
plan = pipeline.state.get('coordinator_plan') or {}
if latest is not None:
import dataclasses
d = dataclasses.asdict(latest) if dataclasses.is_dataclass(latest) else dict(latest)
action_id = str(d.get('frame_id', 'latest'))
seen_ids.add(action_id)
override = _action_log.get(action_id)
actions = d.get('actions') or plan.get('immediate_actions') or []
gates = d.get('selected_gates') or plan.get('selected_gates') or []
just = d.get('justification') or plan.get('actions_justification') or ''
priority = override or priority_map.get(d.get('priority', 'P2'), 'watch')
result.append({
'id': action_id,
'timestamp': d.get('timestamp', ''),
'priority': priority,
'title': actions[0] if actions else plan.get('executive_summary', 'Ω…Ψ±Ψ§Ω‚Ψ¨Ψ©'),
'description': just,
'all_actions': actions,
'selected_gates': gates,
'threat_level': plan.get('threat_level', ''),
'arabic_alert': plan.get('arabic_alert', ''),
'confidence': plan.get('confidence_score', 0),
})
# ── 2. History from decisions_log (enriched with coordinator plan via DB JOIN) ─
for d in decisions_log[:5]:
action_id = str(d.get('frame_id', id(d)))
if action_id in seen_ids:
continue
seen_ids.add(action_id)
override = _action_log.get(action_id)
# Prefer immediate_actions from coordinator plan (richer); fallback to actions
immediate = d.get('immediate_actions') or []
if not immediate:
raw_actions = d.get('actions', '[]')
if isinstance(raw_actions, str):
try:
immediate = json.loads(raw_actions)
except Exception:
immediate = []
elif isinstance(raw_actions, list):
immediate = raw_actions
selected_gates = d.get('selected_gates') or []
justification = d.get('justification', '')
base_priority = priority_map.get(d.get('priority', 'P2'), 'watch')
result.append({
'id': action_id,
'timestamp': d.get('timestamp', ''),
'priority': override or base_priority,
'title': immediate[0] if immediate else 'Ω…Ψ±Ψ§Ω‚Ψ¨Ψ©',
'description': justification,
'all_actions': immediate,
'selected_gates': selected_gates,
'threat_level': d.get('threat_level', ''),
'arabic_alert': d.get('arabic_alert', ''),
'confidence': d.get('confidence', 0),
})
return result[:5]
def _serialize_state() -> dict:
"""Produce a JSON-safe snapshot of pipeline.state."""
s = dict(pipeline.state)
# Annotated frame β†’ base64 JPEG
ann = s.get('annotated')
s['annotated'] = _encode_frame(ann) if isinstance(ann, np.ndarray) else None
# Decision dataclass β†’ dict
ld = s.get('latest_decision')
if ld is not None and dataclasses.is_dataclass(ld):
s['latest_decision'] = dataclasses.asdict(ld)
# decisions_log is already list[dict] from DB
decisions_log = _to_json_safe(s.get('decisions_log', []))
s['decisions_log'] = decisions_log
# Scalar numpy types β†’ Python floats/ints
s['risk_score'] = float(s.get('risk_score', 0.0))
s['density_score'] = float(s.get('density_score', 0.0))
s['density_ema'] = float(s.get('density_ema', 0.0))
s['density_pct'] = float(s.get('density_pct', 0.0))
s['fps'] = float(s.get('fps', 0.0))
# Coordinator plan deep-copy safe
s['coordinator_plan'] = _to_json_safe(s.get('coordinator_plan'))
# Reflection summary
s['reflection_summary'] = _to_json_safe(s.get('reflection_summary', {}))
# Inject frontend-required fields
s['agent_stats'] = _build_agent_stats(decisions_log)
s['gates'] = _build_gates(s.get('risk_level', 'LOW'))
s['proposed_actions'] = _build_proposed_actions(decisions_log)
return s
# ── Background pipeline thread ─────────────────────────────────────────
def _pipeline_loop():
cap = pipeline.get_video_capture()
# ~10 FPS: fast enough to look real-time, slow enough for LLM to keep up
TARGET_FPS = 10
frame_delay = 1.0 / TARGET_FPS
print(f'πŸŽ₯ [API] Video loop started (target {TARGET_FPS} FPS, {frame_delay*1000:.0f}ms/frame)')
while True:
t0 = time.time()
ret, frame = cap.read()
if not ret:
# Loop video β€” full reset so next loop plays fresh
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Reset risk agent (clip state + EMA)
pipeline.risk._reset_clip(0, 'video loop restart')
pipeline.risk._peak_ema = 0.0
pipeline.risk._occ_ema = 0.0
pipeline.risk._prev_count = 0
pipeline.risk._prev_density_sc = 0.0
pipeline.risk._boundary_buf = 0
pipeline.risk._in_boundary = False
# Reset perception frame counter
pipeline.perception.frame_id = 0
# Reset effective level tracker so first level change fires correctly
pipeline._last_effective_risk_level = 'LOW'
# Reset P0 rate-limit so previous loop's cooldown doesn't block
from datetime import datetime as _dt
pipeline.operations._boot_time = _dt.now()
# Reset pipeline state (scores + decisions)
pipeline.state['risk_score'] = 0.0
pipeline.state['risk_level'] = 'LOW'
pipeline.state['density_pct'] = 0.0
pipeline.state['frame_id'] = 0
pipeline.state['decisions_log'] = []
pipeline.state['latest_decision'] = None
pipeline.state['arabic_alert'] = ''
pipeline.state['coordinator_plan'] = None
# Clear frame buffer so frontend shows fresh frames
pipeline._frame_buffer.clear()
print('πŸ” [API] Video loop restarted β€” full state reset')
continue
try:
pipeline.process_one_frame(frame)
except Exception as exc:
print(f'[API] Frame error: {exc}')
# Pace to match real video FPS
elapsed = time.time() - t0
if elapsed < frame_delay:
time.sleep(frame_delay - elapsed)
# ── Startup ────────────────────────────────────────────────────────────
@app.on_event('startup')
def startup_event():
global pipeline
pipeline = RealTimePipeline(
video_path = config.VIDEO_PATH,
groq_api_key = config.GROQ_API_KEY,
anthropic_key = None, # VisionCountAgent disabled β€” YOLO-only mode
model_path = config.MODEL_PATH,
db_path = config.DB_PATH,
)
thread = threading.Thread(target=_pipeline_loop, daemon=True)
thread.start()
print('πŸš€ [API] HaramGuard REST API ready on http://0.0.0.0:8000')
# ── Endpoints ──────────────────────────────────────────────────────────
@app.get('/api/realtime/state')
def get_state():
if pipeline is None:
return JSONResponse({'error': 'pipeline initializing'}, status_code=503)
return JSONResponse(_serialize_state())
@app.get('/api/frames/buffer')
def get_frame_buffer():
"""Return the last N annotated frames as base64 JPEGs for the frontend scrubber."""
if pipeline is None:
return JSONResponse({'error': 'pipeline initializing'}, status_code=503)
frames = []
for fr in list(pipeline._frame_buffer):
if fr.annotated is not None:
encoded = _encode_frame(fr.annotated)
if encoded:
frames.append({
'frame_id': fr.frame_id,
'person_count': fr.person_count,
'track_ids': fr.track_ids,
'annotated': encoded,
})
return JSONResponse({'frames': frames, 'count': len(frames)})
@app.post('/api/actions/{action_id}/approve')
def approve_action(action_id: str):
_action_log[action_id] = 'approved'
print(f'βœ… [API] Action {action_id} approved')
return {'status': 'ok', 'action_id': action_id, 'result': 'approved'}
@app.post('/api/actions/{action_id}/reject')
def reject_action(action_id: str):
_action_log[action_id] = 'rejected'
print(f'❌ [API] Action {action_id} rejected')
return {'status': 'ok', 'action_id': action_id, 'result': 'rejected'}
@app.post('/api/reset')
def reset_pipeline():
"""Reset pipeline state β€” video restarts from frame 0, all scores zeroed."""
global _action_log
if pipeline is None:
return JSONResponse({'error': 'pipeline not ready'}, status_code=503)
# Reset video to start
cap = pipeline.get_video_capture()
cap.set(cv2.CAP_PROP_POS_FRAMES, 0)
# Reset risk β€” full clip state + EMA
pipeline.risk._reset_clip(0, 'API reset')
pipeline.risk._peak_ema = 0.0
pipeline.risk._occ_ema = 0.0
pipeline.risk._prev_count = 0
pipeline.risk._prev_density_sc = 0.0
pipeline.state['risk_score'] = 0.0
pipeline.state['risk_level'] = 'LOW'
pipeline.state['density_pct'] = 0.0
pipeline.state['frame_id'] = 0
# Reset perception agent
pipeline.perception.frame_id = 0
# Clear frame buffer
pipeline._frame_buffer.clear()
# Clear decisions
pipeline.state['decisions_log'] = []
pipeline.state['latest_decision'] = None
pipeline.state['arabic_alert'] = ''
pipeline.state['coordinator_plan'] = None
# Clear action log
_action_log = {}
print('πŸ”„ [API] Pipeline reset β€” all state zeroed')
return {'status': 'ok'}
@app.get('/health')
def health():
return {'status': 'ok', 'pipeline_ready': pipeline is not None}
# ── Entry point ────────────────────────────────────────────────────────
if __name__ == '__main__':
uvicorn.run('api:app', host='0.0.0.0', port=8000, reload=False)