Spaces:
Running
Running
| """ | |
| HaramGuard β VisionCountAgent | |
| ================================ | |
| AISA Layer : Reasoning (Vision) | |
| Design Pattern : Tool Use (Claude Vision API) | |
| Responsibilities: | |
| - Every SAMPLE_EVERY frames, send the frame to Claude Vision | |
| - Ask Claude to count persons and assess crowd density | |
| - Between samples, return the last known count (interpolation) | |
| - Corrects YOLO under-counting in dense/occluded aerial crowds | |
| Why Claude Vision? | |
| YOLO struggles with: | |
| - Heavily occluded persons in dense crowds | |
| - Aerial angles with partial body visibility | |
| - White ihram clothing blending together | |
| Claude Vision understands scene context and can estimate | |
| crowd density even when individual detection fails. | |
| Integration: | |
| Called from PerceptionAgent.process_frame() β replaces or | |
| supplements the YOLO person_count with a more accurate value. | |
| """ | |
| import base64 | |
| import json | |
| import time | |
| import cv2 | |
| import numpy as np | |
| from typing import Optional | |
| class VisionCountAgent: | |
| SAMPLE_EVERY = 60 # call Claude every N frames (~2 sec at 30fps) | |
| MIN_INTERVAL_SEC = 1.5 # minimum seconds between API calls (rate limit safety) | |
| SYSTEM_PROMPT = ( | |
| "You are an expert crowd analysis system for Hajj safety management. " | |
| "You receive aerial/CCTV footage frames from the Grand Mosque area. " | |
| "Your job is to count pilgrims accurately and assess crowd risk." | |
| ) | |
| USER_PROMPT = ( | |
| "Analyze this aerial crowd image from the Grand Mosque (Masjid al-Haram).\n\n" | |
| "Count ALL visible persons including:\n" | |
| "- Partially visible people at edges\n" | |
| "- People in white ihram clothing that may blend together\n" | |
| "- People in dense clusters β estimate the cluster size\n" | |
| "- People partially occluded by others\n\n" | |
| "Respond ONLY with valid JSON, no markdown:\n" | |
| "{\n" | |
| ' "person_count": <integer β total visible persons>,\n' | |
| ' "confidence": <0.0-1.0 β your counting confidence>,\n' | |
| ' "crowd_density": "sparse|moderate|dense|critical",\n' | |
| ' "zones": {\n' | |
| ' "top": <count>, "bottom": <count>, "left": <count>, "right": <count>\n' | |
| " },\n" | |
| ' "notes": "<any important observations>"\n' | |
| "}" | |
| ) | |
| def __init__(self, api_key: str): | |
| """ | |
| api_key: Anthropic API key | |
| """ | |
| import anthropic | |
| self.client = anthropic.Anthropic(api_key=api_key) | |
| self.name = 'VisionCountAgent' | |
| self.aisa_layer = 'Reasoning (Vision)' | |
| self._frame_counter = 0 | |
| self._last_count = 0 | |
| self._last_confidence = 0.0 | |
| self._last_density = 'sparse' | |
| self._last_zones = {'top': 0, 'bottom': 0, 'left': 0, 'right': 0} | |
| self._last_call_time = 0.0 | |
| self._call_log = [] # for evaluation | |
| print('ποΈ [VisionCountAgent] Ready β Claude Vision crowd counting') | |
| print(f' Sampling every {self.SAMPLE_EVERY} frames (~{self.SAMPLE_EVERY/30:.1f}s at 30fps)') | |
| # ββ Private helpers βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _frame_to_base64(self, frame: np.ndarray) -> str: | |
| """Convert BGR numpy frame to base64 JPEG string.""" | |
| # Resize to reduce API payload β 640px wide is enough for counting | |
| h, w = frame.shape[:2] | |
| if w > 640: | |
| scale = 640 / w | |
| frame = cv2.resize(frame, (640, int(h * scale))) | |
| _, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85]) | |
| return base64.standard_b64encode(buffer).decode('utf-8') | |
| def _call_claude(self, frame: np.ndarray) -> Optional[dict]: | |
| """Send frame to Claude Vision and parse response.""" | |
| try: | |
| img_b64 = self._frame_to_base64(frame) | |
| response = self.client.messages.create( | |
| model='claude-opus-4-6', | |
| max_tokens=300, | |
| system=self.SYSTEM_PROMPT, | |
| messages=[{ | |
| 'role': 'user', | |
| 'content': [ | |
| { | |
| 'type': 'image', | |
| 'source': { | |
| 'type': 'base64', | |
| 'media_type': 'image/jpeg', | |
| 'data': img_b64, | |
| } | |
| }, | |
| { | |
| 'type': 'text', | |
| 'text': self.USER_PROMPT | |
| } | |
| ] | |
| }] | |
| ) | |
| raw = response.content[0].text.strip() | |
| # Strip markdown fences if model adds them | |
| raw = raw.replace('```json', '').replace('```', '').strip() | |
| result = json.loads(raw) | |
| # Validate required fields | |
| if 'person_count' not in result: | |
| raise ValueError('Missing person_count in response') | |
| return result | |
| except json.JSONDecodeError as e: | |
| print(f' [VisionCountAgent] JSON parse error: {e}') | |
| return None | |
| except Exception as e: | |
| print(f' [VisionCountAgent] API error: {e}') | |
| return None | |
| # ββ Public API ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_count(self, frame: np.ndarray) -> dict: | |
| """ | |
| Main method β call every frame. | |
| Returns accurate count from Claude every SAMPLE_EVERY frames, | |
| otherwise returns last known count instantly (no API call). | |
| Returns dict: | |
| person_count : int | |
| confidence : float | |
| crowd_density : str | |
| zones : dict | |
| from_vision : bool β True if this frame used Claude Vision | |
| notes : str | |
| """ | |
| self._frame_counter += 1 | |
| now = time.time() | |
| should_sample = ( | |
| self._frame_counter % self.SAMPLE_EVERY == 0 | |
| and (now - self._last_call_time) >= self.MIN_INTERVAL_SEC | |
| ) | |
| if should_sample: | |
| print(f' ποΈ [VisionCountAgent] Sampling frame {self._frame_counter}...') | |
| t0 = time.time() | |
| result = self._call_claude(frame) | |
| elapsed = round(time.time() - t0, 2) | |
| if result: | |
| self._last_count = int(result.get('person_count', self._last_count)) | |
| self._last_confidence = float(result.get('confidence', 0.8)) | |
| self._last_density = result.get('crowd_density', 'moderate') | |
| self._last_zones = result.get('zones', self._last_zones) | |
| self._last_call_time = now | |
| # Log for evaluation | |
| self._call_log.append({ | |
| 'frame': self._frame_counter, | |
| 'count': self._last_count, | |
| 'confidence': self._last_confidence, | |
| 'density': self._last_density, | |
| 'latency_sec': elapsed, | |
| 'notes': result.get('notes', ''), | |
| }) | |
| print( | |
| f' ποΈ [VisionCountAgent] β {self._last_count} persons | ' | |
| f'density={self._last_density} | ' | |
| f'conf={self._last_confidence:.2f} | {elapsed}s' | |
| ) | |
| else: | |
| print(f' ποΈ [VisionCountAgent] API failed β using last count: {self._last_count}') | |
| return { | |
| 'person_count': self._last_count, | |
| 'confidence': self._last_confidence, | |
| 'crowd_density': self._last_density, | |
| 'zones': self._last_zones, | |
| 'from_vision': should_sample, | |
| 'notes': self._call_log[-1]['notes'] if self._call_log else '', | |
| } | |
| def get_evaluation_summary(self) -> dict: | |
| """Stats for the evaluation section of the capstone.""" | |
| if not self._call_log: | |
| return {'total_calls': 0} | |
| latencies = [c['latency_sec'] for c in self._call_log] | |
| counts = [c['count'] for c in self._call_log] | |
| return { | |
| 'total_calls': len(self._call_log), | |
| 'avg_latency_sec': round(sum(latencies) / len(latencies), 2), | |
| 'max_latency_sec': round(max(latencies), 2), | |
| 'avg_count': round(sum(counts) / len(counts), 1), | |
| 'max_count': max(counts), | |
| 'density_dist': { | |
| d: sum(1 for c in self._call_log if c['density'] == d) | |
| for d in ['sparse', 'moderate', 'dense', 'critical'] | |
| }, | |
| } | |