HaramGuard / backend /agents /vision_count_agent.py
adeem6's picture
Upload 52 files
f492127 verified
"""
HaramGuard β€” VisionCountAgent
================================
AISA Layer : Reasoning (Vision)
Design Pattern : Tool Use (Claude Vision API)
Responsibilities:
- Every SAMPLE_EVERY frames, send the frame to Claude Vision
- Ask Claude to count persons and assess crowd density
- Between samples, return the last known count (interpolation)
- Corrects YOLO under-counting in dense/occluded aerial crowds
Why Claude Vision?
YOLO struggles with:
- Heavily occluded persons in dense crowds
- Aerial angles with partial body visibility
- White ihram clothing blending together
Claude Vision understands scene context and can estimate
crowd density even when individual detection fails.
Integration:
Called from PerceptionAgent.process_frame() β€” replaces or
supplements the YOLO person_count with a more accurate value.
"""
import base64
import json
import time
import cv2
import numpy as np
from typing import Optional
class VisionCountAgent:
SAMPLE_EVERY = 60 # call Claude every N frames (~2 sec at 30fps)
MIN_INTERVAL_SEC = 1.5 # minimum seconds between API calls (rate limit safety)
SYSTEM_PROMPT = (
"You are an expert crowd analysis system for Hajj safety management. "
"You receive aerial/CCTV footage frames from the Grand Mosque area. "
"Your job is to count pilgrims accurately and assess crowd risk."
)
USER_PROMPT = (
"Analyze this aerial crowd image from the Grand Mosque (Masjid al-Haram).\n\n"
"Count ALL visible persons including:\n"
"- Partially visible people at edges\n"
"- People in white ihram clothing that may blend together\n"
"- People in dense clusters β€” estimate the cluster size\n"
"- People partially occluded by others\n\n"
"Respond ONLY with valid JSON, no markdown:\n"
"{\n"
' "person_count": <integer β€” total visible persons>,\n'
' "confidence": <0.0-1.0 β€” your counting confidence>,\n'
' "crowd_density": "sparse|moderate|dense|critical",\n'
' "zones": {\n'
' "top": <count>, "bottom": <count>, "left": <count>, "right": <count>\n'
" },\n"
' "notes": "<any important observations>"\n'
"}"
)
def __init__(self, api_key: str):
"""
api_key: Anthropic API key
"""
import anthropic
self.client = anthropic.Anthropic(api_key=api_key)
self.name = 'VisionCountAgent'
self.aisa_layer = 'Reasoning (Vision)'
self._frame_counter = 0
self._last_count = 0
self._last_confidence = 0.0
self._last_density = 'sparse'
self._last_zones = {'top': 0, 'bottom': 0, 'left': 0, 'right': 0}
self._last_call_time = 0.0
self._call_log = [] # for evaluation
print('πŸ‘οΈ [VisionCountAgent] Ready β€” Claude Vision crowd counting')
print(f' Sampling every {self.SAMPLE_EVERY} frames (~{self.SAMPLE_EVERY/30:.1f}s at 30fps)')
# ── Private helpers ───────────────────────────────────────────────
def _frame_to_base64(self, frame: np.ndarray) -> str:
"""Convert BGR numpy frame to base64 JPEG string."""
# Resize to reduce API payload β€” 640px wide is enough for counting
h, w = frame.shape[:2]
if w > 640:
scale = 640 / w
frame = cv2.resize(frame, (640, int(h * scale)))
_, buffer = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 85])
return base64.standard_b64encode(buffer).decode('utf-8')
def _call_claude(self, frame: np.ndarray) -> Optional[dict]:
"""Send frame to Claude Vision and parse response."""
try:
img_b64 = self._frame_to_base64(frame)
response = self.client.messages.create(
model='claude-opus-4-6',
max_tokens=300,
system=self.SYSTEM_PROMPT,
messages=[{
'role': 'user',
'content': [
{
'type': 'image',
'source': {
'type': 'base64',
'media_type': 'image/jpeg',
'data': img_b64,
}
},
{
'type': 'text',
'text': self.USER_PROMPT
}
]
}]
)
raw = response.content[0].text.strip()
# Strip markdown fences if model adds them
raw = raw.replace('```json', '').replace('```', '').strip()
result = json.loads(raw)
# Validate required fields
if 'person_count' not in result:
raise ValueError('Missing person_count in response')
return result
except json.JSONDecodeError as e:
print(f' [VisionCountAgent] JSON parse error: {e}')
return None
except Exception as e:
print(f' [VisionCountAgent] API error: {e}')
return None
# ── Public API ────────────────────────────────────────────────────
def get_count(self, frame: np.ndarray) -> dict:
"""
Main method β€” call every frame.
Returns accurate count from Claude every SAMPLE_EVERY frames,
otherwise returns last known count instantly (no API call).
Returns dict:
person_count : int
confidence : float
crowd_density : str
zones : dict
from_vision : bool β€” True if this frame used Claude Vision
notes : str
"""
self._frame_counter += 1
now = time.time()
should_sample = (
self._frame_counter % self.SAMPLE_EVERY == 0
and (now - self._last_call_time) >= self.MIN_INTERVAL_SEC
)
if should_sample:
print(f' πŸ‘οΈ [VisionCountAgent] Sampling frame {self._frame_counter}...')
t0 = time.time()
result = self._call_claude(frame)
elapsed = round(time.time() - t0, 2)
if result:
self._last_count = int(result.get('person_count', self._last_count))
self._last_confidence = float(result.get('confidence', 0.8))
self._last_density = result.get('crowd_density', 'moderate')
self._last_zones = result.get('zones', self._last_zones)
self._last_call_time = now
# Log for evaluation
self._call_log.append({
'frame': self._frame_counter,
'count': self._last_count,
'confidence': self._last_confidence,
'density': self._last_density,
'latency_sec': elapsed,
'notes': result.get('notes', ''),
})
print(
f' πŸ‘οΈ [VisionCountAgent] β†’ {self._last_count} persons | '
f'density={self._last_density} | '
f'conf={self._last_confidence:.2f} | {elapsed}s'
)
else:
print(f' πŸ‘οΈ [VisionCountAgent] API failed β€” using last count: {self._last_count}')
return {
'person_count': self._last_count,
'confidence': self._last_confidence,
'crowd_density': self._last_density,
'zones': self._last_zones,
'from_vision': should_sample,
'notes': self._call_log[-1]['notes'] if self._call_log else '',
}
def get_evaluation_summary(self) -> dict:
"""Stats for the evaluation section of the capstone."""
if not self._call_log:
return {'total_calls': 0}
latencies = [c['latency_sec'] for c in self._call_log]
counts = [c['count'] for c in self._call_log]
return {
'total_calls': len(self._call_log),
'avg_latency_sec': round(sum(latencies) / len(latencies), 2),
'max_latency_sec': round(max(latencies), 2),
'avg_count': round(sum(counts) / len(counts), 1),
'max_count': max(counts),
'density_dist': {
d: sum(1 for c in self._call_log if c['density'] == d)
for d in ['sparse', 'moderate', 'dense', 'critical']
},
}