""" HaramGuard — VisionCountAgent ================================ AISA Layer : Reasoning (Vision) Design Pattern : Tool Use (Claude Vision API) Responsibilities: - Every SAMPLE_EVERY frames, send the frame to Claude Vision - Ask Claude to count persons and assess crowd density - Between samples, return the last known count (interpolation) - Corrects YOLO under-counting in dense/occluded aerial crowds Why Claude Vision? YOLO struggles with: - Heavily occluded persons in dense crowds - Aerial angles with partial body visibility - White ihram clothing blending together Claude Vision understands scene context and can estimate crowd density even when individual detection fails. Integration: Called from PerceptionAgent.process_frame() — replaces or supplements the YOLO person_count with a more accurate value. """ import base64 import json import time import cv2 import numpy as np from typing import Optional class VisionCountAgent: SAMPLE_EVERY = 60 # call Claude every N frames (~2 sec at 30fps) MIN_INTERVAL_SEC = 1.5 # minimum seconds between API calls (rate limit safety) SYSTEM_PROMPT = ( "You are an expert crowd analysis system for Hajj safety management. " "You receive aerial/CCTV footage frames from the Grand Mosque area. " "Your job is to count pilgrims accurately and assess crowd risk." ) USER_PROMPT = ( "Analyze this aerial crowd image from the Grand Mosque (Masjid al-Haram).\n\n" "Count ALL visible persons including:\n" "- Partially visible people at edges\n" "- People in white ihram clothing that may blend together\n" "- People in dense clusters — estimate the cluster size\n" "- People partially occluded by others\n\n" "Respond ONLY with valid JSON, no markdown:\n" "{\n" ' "person_count": ,\n' ' "confidence": <0.0-1.0 — your counting confidence>,\n' ' "crowd_density": "sparse|moderate|dense|critical",\n' ' "zones": {\n' ' "top": , "bottom": , "left": , "right": \n' " },\n" ' "notes": ""\n' "}" ) def __init__(self, api_key: str): """ api_key: Anthropic API key """ import anthropic self.client = anthropic.Anthropic(api_key=api_key) self.name = 'VisionCountAgent' self.aisa_layer = 'Reasoning (Vision)' self._frame_counter = 0 self._last_count = 0 self._last_confidence = 0.0 self._last_density = 'sparse' self._last_zones = {'top': 0, 'bottom': 0, 'left': 0, 'right': 0} self._last_call_time = 0.0 self._call_log = [] # for evaluation print('👁️ [VisionCountAgent] Ready — Claude Vision crowd counting') print(f' Sampling every {self.SAMPLE_EVERY} frames (~{self.SAMPLE_EVERY/30:.1f}s at 30fps)') # ── Private helpers ─────────────────────────────────────────────── def _frame_to_base64(self, frame: np.ndarray) -> str: """Convert BGR numpy frame to base64 JPEG string.""" # Resize to reduce API payload — 640px wide is enough for counting h, w = frame.shape[:2] if w > 640: scale = 640 / w frame = cv2.resize(frame, (640, int(h * scale))) _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) return base64.standard_b64encode(buffer).decode('utf-8') def _call_claude(self, frame: np.ndarray) -> Optional[dict]: """Send frame to Claude Vision and parse response.""" try: img_b64 = self._frame_to_base64(frame) response = self.client.messages.create( model='claude-opus-4-6', max_tokens=300, system=self.SYSTEM_PROMPT, messages=[{ 'role': 'user', 'content': [ { 'type': 'image', 'source': { 'type': 'base64', 'media_type': 'image/jpeg', 'data': img_b64, } }, { 'type': 'text', 'text': self.USER_PROMPT } ] }] ) raw = response.content[0].text.strip() # Strip markdown fences if model adds them raw = raw.replace('```json', '').replace('```', '').strip() result = json.loads(raw) # Validate required fields if 'person_count' not in result: raise ValueError('Missing person_count in response') return result except json.JSONDecodeError as e: print(f' [VisionCountAgent] JSON parse error: {e}') return None except Exception as e: print(f' [VisionCountAgent] API error: {e}') return None # ── Public API ──────────────────────────────────────────────────── def get_count(self, frame: np.ndarray) -> dict: """ Main method — call every frame. Returns accurate count from Claude every SAMPLE_EVERY frames, otherwise returns last known count instantly (no API call). Returns dict: person_count : int confidence : float crowd_density : str zones : dict from_vision : bool — True if this frame used Claude Vision notes : str """ self._frame_counter += 1 now = time.time() should_sample = ( self._frame_counter % self.SAMPLE_EVERY == 0 and (now - self._last_call_time) >= self.MIN_INTERVAL_SEC ) if should_sample: print(f' 👁️ [VisionCountAgent] Sampling frame {self._frame_counter}...') t0 = time.time() result = self._call_claude(frame) elapsed = round(time.time() - t0, 2) if result: self._last_count = int(result.get('person_count', self._last_count)) self._last_confidence = float(result.get('confidence', 0.8)) self._last_density = result.get('crowd_density', 'moderate') self._last_zones = result.get('zones', self._last_zones) self._last_call_time = now # Log for evaluation self._call_log.append({ 'frame': self._frame_counter, 'count': self._last_count, 'confidence': self._last_confidence, 'density': self._last_density, 'latency_sec': elapsed, 'notes': result.get('notes', ''), }) print( f' 👁️ [VisionCountAgent] → {self._last_count} persons | ' f'density={self._last_density} | ' f'conf={self._last_confidence:.2f} | {elapsed}s' ) else: print(f' 👁️ [VisionCountAgent] API failed — using last count: {self._last_count}') return { 'person_count': self._last_count, 'confidence': self._last_confidence, 'crowd_density': self._last_density, 'zones': self._last_zones, 'from_vision': should_sample, 'notes': self._call_log[-1]['notes'] if self._call_log else '', } def get_evaluation_summary(self) -> dict: """Stats for the evaluation section of the capstone.""" if not self._call_log: return {'total_calls': 0} latencies = [c['latency_sec'] for c in self._call_log] counts = [c['count'] for c in self._call_log] return { 'total_calls': len(self._call_log), 'avg_latency_sec': round(sum(latencies) / len(latencies), 2), 'max_latency_sec': round(max(latencies), 2), 'avg_count': round(sum(counts) / len(counts), 1), 'max_count': max(counts), 'density_dist': { d: sum(1 for c in self._call_log if c['density'] == d) for d in ['sparse', 'moderate', 'dense', 'critical'] }, }