Smart-Crowd-Detector / src /heatmap /generator.py
Ali Abdullah
Fix requirements.txt encoding for HF
98a79a7
"""
Crowd Density Heatmap Generator
Implements REQ-4, REQ-5: Generate and visualize crowd density zones
"""
import cv2
import numpy as np
from typing import List, Dict, Tuple
import time
import logging
logger = logging.getLogger(__name__)
class HeatmapGenerator:
"""
Generate crowd density heatmaps
Satisfies SRS Requirements:
- REQ-4: Generate localized density zones
- REQ-5: Apply color map representing crowd concentration
"""
def __init__(self, config: Dict):
"""
Initialize heatmap generator
Args:
config: Configuration dictionary
"""
self.config = config
self.enabled = config['heatmap']['enabled']
self.kernel_size = config['heatmap']['kernel_size']
self.alpha = config['heatmap']['alpha']
self.colormap_name = config['heatmap']['colormap']
# Adaptive heatmap settings
self.adaptive = config['heatmap'].get('adaptive', True)
self.min_kernel_size = config['heatmap'].get('min_kernel_size', 30)
self.max_kernel_size = config['heatmap'].get('max_kernel_size', 150)
self.blur_strength = config['heatmap'].get('blur_strength', 0.6)
# Map colormap name to OpenCV constant
colormap_dict = {
'jet': cv2.COLORMAP_JET,
'hot': cv2.COLORMAP_HOT,
'viridis': cv2.COLORMAP_VIRIDIS,
'plasma': cv2.COLORMAP_PLASMA,
'rainbow': cv2.COLORMAP_RAINBOW,
'cool': cv2.COLORMAP_COOL
}
self.colormap = colormap_dict.get(self.colormap_name, cv2.COLORMAP_JET)
# Performance tracking
self.generation_times = []
logger.info(f"Heatmap Generator initialized: Enabled={self.enabled}")
logger.info(f"Kernel size: {self.kernel_size}, Alpha: {self.alpha}, Colormap: {self.colormap_name}")
logger.info(f"Adaptive mode: {self.adaptive}, Range: {self.min_kernel_size}-{self.max_kernel_size}")
def generate_heatmap(self, frame: np.ndarray, detections: List[Dict]) -> Tuple[np.ndarray, float]:
"""
Generate crowd density heatmap with ADAPTIVE kernel sizing
Args:
frame: Input frame
detections: List of detections with center points and bbox
Returns:
heatmap_overlay: Frame with heatmap overlay
generation_time: Time taken to generate heatmap
Implements:
- REQ-4: Generate localized density zones
- REQ-5: Apply color map for crowd concentration
- ADAPTIVE: Auto-adjusts kernel size based on detection box dimensions
"""
start_time = time.time()
# Validate inputs
if frame is None or frame.size == 0:
logger.error("Invalid frame provided to heatmap generator")
return np.zeros((480, 640, 3), dtype=np.uint8), 0.0
# Only check if there are detections
if not detections or len(detections) == 0:
generation_time = time.time() - start_time
return frame.copy(), generation_time
try:
h, w = frame.shape[:2]
# Validate frame dimensions
if h <= 0 or w <= 0:
logger.error(f"Invalid frame dimensions: {h}x{w}")
return frame.copy(), 0.0
# Create empty density map (REQ-4: localized density zones)
density_map = np.zeros((h, w), dtype=np.float32)
# Calculate adaptive kernel size with validation
if self.adaptive and len(detections) > 0:
total_size = 0
valid_detections = 0
for det in detections:
try:
bbox = det.get('bbox', [])
if len(bbox) != 4:
continue
x1, y1, x2, y2 = bbox
box_width = max(0, x2 - x1)
box_height = max(0, y2 - y1)
if box_width > 0 and box_height > 0:
total_size += (box_width + box_height) / 2
valid_detections += 1
except (KeyError, TypeError, ValueError) as e:
logger.debug(f"Skipping invalid detection: {e}")
continue
if valid_detections > 0:
avg_box_size = total_size / valid_detections
# Scale kernel size based on average object size
kernel_radius = int(np.clip(avg_box_size * 0.8,
self.min_kernel_size,
self.max_kernel_size))
kernel_radius = max(15, kernel_radius)
logger.debug(f"Adaptive kernel: avg={avg_box_size:.1f}, radius={kernel_radius}")
else:
kernel_radius = self.kernel_size
else:
kernel_radius = self.kernel_size
# Add Gaussian blobs at each detection center with validation
for det in detections:
try:
center = det.get('center', [])
bbox = det.get('bbox', [])
if len(center) != 2 or len(bbox) != 4:
continue
cx, cy = center
# Validate center coordinates
if not (0 <= cx < w and 0 <= cy < h):
logger.debug(f"Skipping out-of-bounds detection at ({cx}, {cy})")
continue
# Get detection-specific size for better adaptation
if self.adaptive:
x1, y1, x2, y2 = bbox
det_width = max(0, x2 - x1)
det_height = max(0, y2 - y1)
det_size = (det_width + det_height) / 2
if det_size <= 0:
det_kernel = kernel_radius
else:
det_kernel = int(np.clip(det_size * 0.8,
self.min_kernel_size,
self.max_kernel_size))
det_kernel = max(15, det_kernel)
else:
det_kernel = kernel_radius
# Calculate ROI bounds with proper clamping
y_min = max(0, cy - det_kernel)
y_max = min(h, cy + det_kernel)
x_min = max(0, cx - det_kernel)
x_max = min(w, cx + det_kernel)
# Validate ROI dimensions
kernel_height = y_max - y_min
kernel_width = x_max - x_min
if kernel_height <= 0 or kernel_width <= 0:
continue
# Create 2D Gaussian with bounds checking
y_range = np.arange(y_min, y_max) - cy
x_range = np.arange(x_min, x_max) - cx
if len(y_range) == 0 or len(x_range) == 0:
continue
x_grid, y_grid = np.meshgrid(x_range, y_range)
# Gaussian formula with adaptive sigma
det_sigma = det_kernel * self.blur_strength
gaussian = np.exp(-(x_grid**2 + y_grid**2) / (2 * det_sigma**2))
# Use confidence as intensity multiplier for better visualization
intensity = det.get('confidence', 1.0)
# Add to density map with bounds safety
try:
density_map[y_min:y_max, x_min:x_max] += gaussian.astype(np.float32) * intensity
except (ValueError, IndexError) as e:
logger.debug(f"Skipping gaussian placement: {e}")
continue
except (KeyError, TypeError, ValueError, IndexError) as e:
logger.debug(f"Error processing detection for heatmap: {e}")
continue
# Normalize density map to 0-255
if density_map.max() > 0:
density_map = (density_map / density_map.max() * 255).astype(np.uint8)
else:
density_map = density_map.astype(np.uint8)
# Apply single Gaussian blur for smooth appearance (removed double blur)
blur_size = max(11, min(21, kernel_radius // 4)) # Adaptive blur size
if blur_size % 2 == 0:
blur_size += 1 # Must be odd
density_map = cv2.GaussianBlur(density_map, (blur_size, blur_size), 0)
# Apply colormap (REQ-5: color map representing concentration)
heatmap_colored = cv2.applyColorMap(density_map, self.colormap)
# Overlay heatmap on original frame
heatmap_overlay = cv2.addWeighted(
frame,
1 - self.alpha,
heatmap_colored,
self.alpha,
0
)
generation_time = time.time() - start_time
self.generation_times.append(generation_time)
# Check performance constraint (SRS: ≤ 1.5s per frame)
if generation_time > self.config['constraints']['max_heatmap_delay']:
logger.warning(
f"Heatmap generation exceeded constraint: "
f"{generation_time:.3f}s > {self.config['constraints']['max_heatmap_delay']}s"
)
return heatmap_overlay, generation_time
except Exception as e:
logger.error(f"Heatmap generation error: {e}")
import traceback
logger.error(traceback.format_exc())
return frame.copy(), time.time() - start_time
def generate_density_grid(self, frame: np.ndarray, detections: List[Dict],
grid_size: int = 50) -> np.ndarray:
"""
Generate grid-based density visualization (alternative method)
Args:
frame: Input frame
detections: List of detections
grid_size: Size of each grid cell
Returns:
grid_overlay: Frame with grid density overlay
"""
h, w = frame.shape[:2]
overlay = frame.copy()
# Create grid
grid_h = h // grid_size + 1
grid_w = w // grid_size + 1
density_grid = np.zeros((grid_h, grid_w), dtype=int)
# Count detections in each grid cell
for det in detections:
cx, cy = det['center']
grid_x = min(cx // grid_size, grid_w - 1)
grid_y = min(cy // grid_size, grid_h - 1)
density_grid[grid_y, grid_x] += 1
# Draw grid with color intensity based on density
max_density = density_grid.max() if density_grid.max() > 0 else 1
for gy in range(grid_h):
for gx in range(grid_w):
if density_grid[gy, gx] > 0:
x1 = gx * grid_size
y1 = gy * grid_size
x2 = min(x1 + grid_size, w)
y2 = min(y1 + grid_size, h)
# Color intensity based on density
intensity = int(255 * (density_grid[gy, gx] / max_density))
color = (0, intensity, 255 - intensity) # Blue to red
# Draw semi-transparent rectangle
sub_img = overlay[y1:y2, x1:x2]
rect = np.full_like(sub_img, color, dtype=np.uint8)
overlay[y1:y2, x1:x2] = cv2.addWeighted(sub_img, 0.7, rect, 0.3, 0)
return overlay
def get_statistics(self) -> Dict:
"""Get heatmap generation statistics"""
if not self.generation_times:
return {
'avg_generation_time': 0.0,
'total_heatmaps': 0
}
return {
'avg_generation_time': np.mean(self.generation_times[-100:]),
'total_heatmaps': len(self.generation_times),
'max_generation_time': max(self.generation_times),
'min_generation_time': min(self.generation_times)
}