Spaces:
Running
Running
| """ | |
| HaramGuard β PerceptionAgent | |
| ============================== | |
| AISA Layer : Tool & Environment Layer | |
| Design Pattern : Tool Use β YOLO Detection + Spatial Grid Analysis | |
| Detection strategy: | |
| - YOLO11l β bounding boxes + tracking IDs + spacing (fast, every frame) | |
| - Spatial Grid β 3x3 zone analysis for hotspot detection (UQU research-based) | |
| Why spatial grid? | |
| Based on Umm Al-Qura University research on Haram crowd models: | |
| A global person_count of 47 spread evenly is safe. | |
| 47 persons clustered in one corner (e.g. Mataf bottleneck) is dangerous. | |
| The grid catches local density spikes that the global count misses entirely. | |
| Grid design: frame divided into 3Γ3 zones. | |
| Each cell threshold = HIGH_COUNT / 4 (~12 persons). | |
| If any single cell exceeds threshold β hotspot flagged β RiskAgent Path 4 fires. | |
| """ | |
| import time | |
| import numpy as np | |
| from ultralytics import YOLO | |
| from scipy.spatial.distance import cdist | |
| from typing import Optional, Tuple | |
| from core.models import FrameResult | |
| from agents.vision_count_agent import VisionCountAgent | |
| class PerceptionAgent: | |
| # ββ Guardrails ββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| MAX_PERSONS = 1000 # GR-1: cap implausible counts | |
| MAX_DENSITY = 50.0 # GR-2: cap anomalous density scores | |
| # ββ Spatial grid (UQU research-based) ββββββββββββββββββββββββββββ | |
| GRID_ROWS = 3 | |
| GRID_COLS = 3 | |
| # Zone labels for dashboard / CoordinatorAgent context | |
| ZONE_LABELS = { | |
| (0,0): 'top-left', (0,1): 'top-center', (0,2): 'top-right', | |
| (1,0): 'mid-left', (1,1): 'center', (1,2): 'mid-right', | |
| (2,0): 'bottom-left', (2,1): 'bottom-center', (2,2): 'bottom-right', | |
| } | |
| def __init__(self, | |
| model_path: str = 'yolo11l.pt', | |
| anthropic_key: Optional[str] = None): | |
| self.name = 'PerceptionAgent' | |
| self.aisa_layer = 'Tool & Environment Layer' | |
| self.model = YOLO(model_path) | |
| self.frame_id = 0 | |
| self.vision = None | |
| if anthropic_key: | |
| self.vision = VisionCountAgent(api_key=anthropic_key) | |
| print('π [PerceptionAgent] Hybrid mode β YOLO11l + spatial grid analysis') | |
| else: | |
| print(f'π [PerceptionAgent] YOLO11l + spatial grid β {model_path}') | |
| # ββ Spatial grid (UQU research) βββββββββββββββββββββββββββββββββββ | |
| def _compute_spatial_grid( | |
| self, | |
| boxes: list, | |
| h: int, | |
| w: int | |
| ) -> Tuple[np.ndarray, int, str]: | |
| """ | |
| Divide frame into 3Γ3 grid, count persons per cell. | |
| Based on UQU (Umm Al-Qura University) Haram crowd research: | |
| density maps and heat maps reveal local clustering that global | |
| counts miss β especially at Mataf bottlenecks and corridor choke points. | |
| Returns: | |
| grid : 3Γ3 numpy array of person counts per cell | |
| grid_max : highest count in any single cell | |
| hotspot_zone : label of the most crowded cell (e.g. 'center') | |
| """ | |
| grid = np.zeros((self.GRID_ROWS, self.GRID_COLS), dtype=int) | |
| cell_h = h / self.GRID_ROWS | |
| cell_w = w / self.GRID_COLS | |
| for box in boxes: | |
| cx = (box['x1'] + box['x2']) / 2.0 | |
| cy = (box['y1'] + box['y2']) / 2.0 | |
| col = min(int(cx / cell_w), self.GRID_COLS - 1) | |
| row = min(int(cy / cell_h), self.GRID_ROWS - 1) | |
| grid[row, col] += 1 | |
| grid_max = int(grid.max()) if grid.size > 0 else 0 | |
| hot_row, hot_col = np.unravel_index(grid.argmax(), grid.shape) | |
| hotspot_zone = self.ZONE_LABELS.get((hot_row, hot_col), 'unknown') | |
| return grid, grid_max, hotspot_zone | |
| # ββ Main processing βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def process_frame(self, frame: np.ndarray) -> FrameResult: | |
| flags = [] | |
| h, w = frame.shape[:2] | |
| # ββ YOLO: bounding boxes + tracking ββββββββββββββββββββββββββ | |
| det = self.model.track( | |
| frame, | |
| persist=True, | |
| imgsz=1280, | |
| classes=[0], | |
| conf=0.15, | |
| iou=0.45, | |
| tracker='botsort.yaml', | |
| verbose=False | |
| )[0] | |
| boxes_raw = det.boxes | |
| boxes, centers = [], [] | |
| track_ids = [] | |
| if boxes_raw is not None: | |
| for box in boxes_raw: | |
| x1, y1, x2, y2 = [int(v) for v in box.xyxy[0].tolist()] | |
| conf = float(box.conf[0]) | |
| boxes.append({'x1': x1, 'y1': y1, 'x2': x2, 'y2': y2, 'conf': conf}) | |
| centers.append([(x1 + x2) / 2, (y1 + y2) / 2]) | |
| if box.id is not None: | |
| track_ids.append(int(box.id[0])) | |
| yolo_count = len(boxes) | |
| # ββ Claude Vision: accurate count every 60 frames βββββββββββββ | |
| vision_result = None | |
| if self.vision: | |
| vision_result = self.vision.get_count(frame) | |
| # ββ Choose best count βββββββββββββββββββββββββββββββββββββββββ | |
| if vision_result and vision_result['person_count'] > 0: | |
| final_count = vision_result['person_count'] | |
| if vision_result['from_vision']: | |
| flags.append(f'vision_count:{final_count}(yolo:{yolo_count})') | |
| else: | |
| final_count = yolo_count | |
| # ββ Guardrail 1: impossible person count βββββββββββββββββββββ | |
| if final_count > self.MAX_PERSONS: | |
| flags.append(f'GR1_count_capped:{final_count}->{self.MAX_PERSONS}') | |
| final_count = self.MAX_PERSONS | |
| boxes = boxes[:self.MAX_PERSONS] | |
| centers = centers[:self.MAX_PERSONS] | |
| # ββ Average spacing βββββββββββββββββββββββββββββββββββββββββββ | |
| avg_spacing = 999.0 | |
| if len(centers) >= 2: | |
| c = np.array(centers) | |
| d = cdist(c, c) | |
| np.fill_diagonal(d, np.inf) | |
| avg_spacing = float(d.min(axis=1).mean()) | |
| # ββ Density score βββββββββββββββββββββββββββββββββββββββββββββ | |
| density = round(final_count / ((h * w) / 10_000), 4) | |
| # ββ Occupation ratio ββββββββββββββββββββββββββββββββββββββββββ | |
| frame_area = h * w | |
| box_area_sum = sum((b['x2']-b['x1']) * (b['y2']-b['y1']) for b in boxes) | |
| occupation_pct = round( | |
| min((box_area_sum / frame_area) * 100, 100.0), 2 | |
| ) if frame_area > 0 else 0.0 | |
| # ββ Guardrail 2: anomalous density βββββββββββββββββββββββββββ | |
| if density > self.MAX_DENSITY: | |
| flags.append(f'GR2_density_capped:{density:.1f}->{self.MAX_DENSITY}') | |
| density = self.MAX_DENSITY | |
| # ββ Spatial grid (UQU research) βββββββββββββββββββββββββββββββ | |
| # Detects local clustering: 47 persons in one corner is more | |
| # dangerous than 47 persons spread across the frame. | |
| grid, grid_max, hotspot_zone = self._compute_spatial_grid(boxes, h, w) | |
| if grid_max > 0: | |
| flags.append(f'grid_hotspot:{hotspot_zone}({grid_max}p)') | |
| # ββ Compression βββββββββββββββββββββββββββββββββββββββββββββββ | |
| if avg_spacing < 999 and density > 0: | |
| spacing_norm = min(avg_spacing / 120.0, 1.0) | |
| density_norm = min(density / 1.0, 1.0) | |
| compression_ratio = (1.0 - spacing_norm) * density_norm | |
| else: | |
| compression_ratio = 0.0 | |
| # ββ Distribution score ββββββββββββββββββββββββββββββββββββββββ | |
| if len(centers) >= 3: | |
| centers_arr = np.array(centers) | |
| x_var = np.var(centers_arr[:, 0]) | |
| y_var = np.var(centers_arr[:, 1]) | |
| total_variance = (x_var + y_var) / ((h * w) / 1000.0) | |
| distribution_score = min(total_variance, 1.0) | |
| else: | |
| distribution_score = 0.3 | |
| annotated = det.plot() | |
| self.frame_id += 1 | |
| return FrameResult( | |
| frame_id = self.frame_id, | |
| timestamp = time.time(), | |
| person_count = final_count, | |
| density_score = density, | |
| avg_spacing = round(avg_spacing, 2), | |
| boxes = boxes, | |
| annotated = annotated, | |
| guardrail_flags = flags, | |
| track_ids = track_ids, | |
| occupation_pct = occupation_pct, | |
| compression_ratio = round(compression_ratio, 4), | |
| flow_velocity = 0.0, | |
| distribution_score = round(distribution_score, 4), | |
| # ββ NEW: spatial grid fields ββββββββββββββββββββββββββββββ | |
| grid_counts = grid.tolist(), # 3Γ3 list for dashboard heat map | |
| grid_max = grid_max, # max persons in any single cell | |
| hotspot_zone = hotspot_zone, # label: 'center', 'top-left', etc. | |
| ) |