UF_clean / backend /visualize.py
Subh775's picture
fixed density heatmap..
d17539c
import os
import numpy as np
import cv2
import csv
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.ticker import MaxNLocator
from export_json import export_json
# Formal MIS palette
C_PRIMARY = "#1e293b"
C_ACCENT = "#334155"
C_IN = "#059669"
C_OUT = "#dc2626"
C_FLOW = "#2563eb"
C_CONG = "#d97706"
C_CONF = "#7c3aed"
C_BAR = "#0f766e"
C_GRID = "#e2e8f0"
C_BG = "#ffffff"
def _style(ax, title, xlabel="", ylabel=""):
ax.set_title(title, fontsize=13, fontweight="700", color=C_PRIMARY, pad=14)
if xlabel:
ax.set_xlabel(xlabel, fontsize=9, fontweight="600", color=C_ACCENT)
if ylabel:
ax.set_ylabel(ylabel, fontsize=9, fontweight="600", color=C_ACCENT)
ax.tick_params(labelsize=8, colors=C_ACCENT)
ax.spines["top"].set_visible(False)
ax.spines["right"].set_visible(False)
ax.spines["left"].set_color(C_GRID)
ax.spines["bottom"].set_color(C_GRID)
ax.yaxis.grid(True, color=C_GRID, linewidth=0.6, alpha=0.8)
ax.set_axisbelow(True)
def _save(fig, path, fmt="png"):
if fmt == "pdf":
path = path.rsplit(".", 1)[0] + ".pdf"
fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none")
plt.close(fig)
def direction_pie(total_in, total_out, out_dir, fmt="png"):
if total_in + total_out == 0:
return None
fig, ax = plt.subplots(figsize=(5, 5), facecolor=C_BG)
wedges, texts, autotexts = ax.pie(
[total_in, total_out],
labels=[f"Incoming ({total_in})", f"Outgoing ({total_out})"],
autopct="%1.1f%%",
startangle=90,
colors=[C_IN, C_OUT],
wedgeprops={"edgecolor": C_BG, "linewidth": 2.5 if (total_in > 0 and total_out > 0) else 0},
textprops={"fontsize": 10, "fontweight": "600", "color": C_PRIMARY},
)
for t in autotexts:
t.set_fontsize(11)
t.set_fontweight("700")
t.set_color(C_BG)
ax.set_title("Directional Split", fontsize=13, fontweight="700", color=C_PRIMARY, pad=16)
total = total_in + total_out
ax.text(0, -1.35, f"Total: {total} vehicles", ha="center", fontsize=9, color=C_ACCENT, fontweight="500")
path = os.path.join(out_dir, "direction_pie.png")
_save(fig, path, fmt)
ext = fmt if fmt == "pdf" else "png"
return f"direction_pie.{ext}"
def flow_histogram(flow_times, out_dir, fmt="png"):
if not flow_times:
return None
fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG)
bins = min(30, max(5, len(set(flow_times))))
counts, edges, patches = ax.hist(flow_times, bins=bins, color=C_FLOW, alpha=0.85, edgecolor=C_BG, linewidth=0.8)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
_style(ax, "Traffic Flow Over Time", "Time (seconds)", "Vehicles Crossed")
peak_idx = int(np.argmax(counts))
peak_time = (edges[peak_idx] + edges[peak_idx + 1]) / 2
ax.text(0.98, 0.95, f"Peak: {int(counts[peak_idx])} vehicles at {peak_time:.1f}s",
transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT,
bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID))
path = os.path.join(out_dir, "flow_over_time.png")
_save(fig, path, fmt)
ext = fmt if fmt == "pdf" else "png"
return f"flow_over_time.{ext}"
def congestion_chart(congestion, out_dir, fmt="png"):
if not congestion:
return None
fig, ax = plt.subplots(figsize=(10, 4), facecolor=C_BG)
x = range(len(congestion))
ax.fill_between(x, congestion, alpha=0.08, color=C_CONG)
ax.plot(x, congestion, alpha=0.25, color=C_CONG, linewidth=0.5)
win = min(30, max(3, len(congestion) // 10))
smooth = np.convolve(congestion, np.ones(win) / win, mode="same")
ax.plot(x, smooth, linewidth=2, color=C_CONG)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
_style(ax, "Congestion Index", "Frame", "Active Vehicles")
avg = np.mean(congestion)
peak = max(congestion)
ax.axhline(avg, color=C_ACCENT, linewidth=0.8, linestyle="--", alpha=0.5)
ax.text(0.98, 0.95, f"Peak: {peak} | Avg: {avg:.1f}",
transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT,
bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID))
path = os.path.join(out_dir, "congestion_index.png")
_save(fig, path, fmt)
ext = fmt if fmt == "pdf" else "png"
return f"congestion_index.{ext}"
def class_dominance(class_in, class_out, model_classes, out_dir, fmt="png"):
totals = {}
for k in set(list(class_in.keys()) + list(class_out.keys())):
totals[k] = class_in.get(k, 0) + class_out.get(k, 0)
if not totals or sum(totals.values()) == 0:
return None
sorted_items = sorted(totals.items(), key=lambda x: x[1], reverse=True)
classes = [model_classes.get(int(i), f"cls_{i}") for i, _ in sorted_items]
values = [v for _, v in sorted_items]
fig, ax = plt.subplots(figsize=(10, 4.5), facecolor=C_BG)
n = len(classes)
bar_width = min(0.45, max(0.15, 0.6 / max(n, 1)))
bars = ax.bar(range(n), values, width=bar_width, color=C_BAR, edgecolor=C_BG, linewidth=0.5, zorder=3)
ax.set_xticks(range(n))
ax.set_xticklabels(classes, rotation=35, ha="right", fontsize=9, fontweight="500")
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
for bar, v in zip(bars, values):
ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.15,
str(v), ha="center", va="bottom", fontsize=9, fontweight="700", color=C_PRIMARY)
_style(ax, "Class Dominance", "", "Vehicle Count")
total = sum(values)
ax.text(0.98, 0.95, f"Total: {total} vehicles | {n} classes detected",
transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT,
bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID))
path = os.path.join(out_dir, "class_dominance.png")
_save(fig, path, fmt)
ext = fmt if fmt == "pdf" else "png"
return f"class_dominance.{ext}"
def confidence_dist(conf_scores, out_dir, fmt="png"):
if not conf_scores:
return None
fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG)
ax.hist(conf_scores, bins=30, color=C_CONF, alpha=0.85, edgecolor=C_BG, linewidth=0.8)
ax.yaxis.set_major_locator(MaxNLocator(integer=True))
_style(ax, "Detection Confidence Distribution", "Confidence Score", "Detections")
mean_c = np.mean(conf_scores)
median_c = np.median(conf_scores)
ax.axvline(mean_c, color=C_PRIMARY, linewidth=1, linestyle="--", alpha=0.6)
ax.text(0.98, 0.95, f"Mean: {mean_c:.3f} | Median: {median_c:.3f} | N={len(conf_scores)}",
transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT,
bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID))
path = os.path.join(out_dir, "confidence_dist.png")
_save(fig, path, fmt)
ext = fmt if fmt == "pdf" else "png"
return f"confidence_dist.{ext}"
def export_csv(raw_events, out_dir):
if not raw_events or len(raw_events) <= 1:
return None
path = os.path.join(out_dir, "raw_data.csv")
with open(path, mode="w", newline="") as f:
writer = csv.writer(f)
writer.writerows(raw_events)
return "raw_data.csv"
def spatial_heatmap(heatmap_points, video_path, out_dir, fmt="png"):
"""
Confidence-Weighted Spatial Density Map (xAI / Explainability Overlay).
Each detection contributes a Gaussian kernel to the accumulation grid,
weighted by the model's own confidence score for that detection.
This means the heatmap directly encodes WHERE the model is most certain
vehicles exist β€” making it a faithful spatial explanation of the detector's
attention, without requiring backpropagation.
This is distinct from Grad-CAM (which needs a differentiable classifier) and
is the correct xAI approach for a post-processing YOLO/OpenVINO deployment
where gradients are not available at runtime.
Algorithm:
1. For each detection (cx, cy, conf): stamp a 2D Gaussian kernel of
radius proportional to frame size, weighted by conf.
2. Accumulate all weighted kernels into a float32 density grid.
3. Apply a mild additional Gaussian blur for visual smoothness.
4. Normalize [0, 255], apply COLORMAP_JET.
5. Blend over the original frame only where density > threshold.
6. Annotate with a legend showing the confidence scale.
"""
if not heatmap_points or not video_path or not os.path.exists(video_path):
return None
cap = cv2.VideoCapture(video_path)
ret, frame = cap.read()
cap.release()
if not ret:
return None
h, w = frame.shape[:2]
density = np.zeros((h, w), dtype=np.float32)
# Kernel radius: ~3% of the shorter dimension, min 20px
kernel_r = max(20, int(min(h, w) * 0.03))
kernel_size = kernel_r * 2 + 1
# Pre-build a unit Gaussian kernel to stamp for each detection
_kx = cv2.getGaussianKernel(kernel_size, kernel_r / 2.5)
_unit_kernel = (_kx @ _kx.T).astype(np.float32) # shape (ks, ks)
for pt in heatmap_points:
cx, cy = int(pt[0]), int(pt[1])
# Support both old [cx, cy] and new [cx, cy, conf] formats
conf = float(pt[2]) if len(pt) > 2 else 1.0
# Kernel bounding box (clip to frame)
x0 = max(0, cx - kernel_r)
y0 = max(0, cy - kernel_r)
x1 = min(w, cx + kernel_r + 1)
y1 = min(h, cy + kernel_r + 1)
# Corresponding slice in the kernel
kx0 = x0 - (cx - kernel_r)
ky0 = y0 - (cy - kernel_r)
kx1 = kx0 + (x1 - x0)
ky1 = ky0 + (y1 - y0)
if x1 > x0 and y1 > y0 and kx1 > kx0 and ky1 > ky0:
density[y0:y1, x0:x1] += conf * _unit_kernel[ky0:ky1, kx0:kx1]
# Mild additional smoothing pass
density = cv2.GaussianBlur(density, (31, 31), 0)
max_val = np.max(density)
if max_val <= 0:
return None
density_norm = (density / max_val * 255.0).astype(np.uint8)
heatmap_color = cv2.applyColorMap(density_norm, cv2.COLORMAP_JET)
# Blend: only paint where density is meaningful (>4% of max)
threshold = int(0.04 * 255)
mask = density_norm > threshold
overlay = frame.copy()
# Smooth alpha blend using the density as alpha weight
alpha_map = (density_norm.astype(np.float32) / 255.0) * 0.72
alpha_map = np.clip(alpha_map, 0, 0.72)
for c in range(3):
overlay[:, :, c] = np.where(
mask,
(1.0 - alpha_map) * frame[:, :, c] + alpha_map * heatmap_color[:, :, c],
frame[:, :, c]
).astype(np.uint8)
# ── xAI legend bar ──────────────────────────────────────────────────────
# Draw a horizontal colorbar with labels in the bottom-left corner
bar_w, bar_h = min(240, w // 4), 14
bar_x, bar_y = 16, h - bar_h - 36
gradient = np.tile(np.arange(256, dtype=np.uint8), (bar_h, 1))
gradient_color = cv2.applyColorMap(gradient, cv2.COLORMAP_JET) # (bar_h, 256, 3)
gradient_resized = cv2.resize(gradient_color, (bar_w, bar_h))
# Semi-transparent background panel behind the legend
panel_pad = 10
panel = overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18,
bar_x - panel_pad : bar_x + bar_w + panel_pad]
if panel.size > 0:
dark = np.full_like(panel, 15)
overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18,
bar_x - panel_pad : bar_x + bar_w + panel_pad] = cv2.addWeighted(panel, 0.35, dark, 0.65, 0)
overlay[bar_y : bar_y + bar_h, bar_x : bar_x + bar_w] = gradient_resized
font = cv2.FONT_HERSHEY_SIMPLEX
font_s = 0.35
thickness = 1
label_color = (220, 220, 220)
cv2.putText(overlay, "Low Confidence", (bar_x, bar_y + bar_h + 14),
font, font_s, label_color, thickness, cv2.LINE_AA)
high_label = "High Confidence"
(tw, _), _ = cv2.getTextSize(high_label, font, font_s, thickness)
cv2.putText(overlay, high_label, (bar_x + bar_w - tw, bar_y + bar_h + 14),
font, font_s, label_color, thickness, cv2.LINE_AA)
# Title label above the bar
title_label = "Detection Confidence Density (xAI)"
(ttw, _), _ = cv2.getTextSize(title_label, font, 0.38, thickness)
cv2.putText(overlay, title_label,
(bar_x, bar_y - panel_pad + 8),
font, 0.38, (180, 180, 180), thickness, cv2.LINE_AA)
# ── end legend ──────────────────────────────────────────────────────────
if fmt == "pdf":
overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB)
fig, ax = plt.subplots(figsize=(12, 7), facecolor=C_BG)
ax.imshow(overlay_rgb)
ax.set_title("Detection Confidence Density Map Β· xAI Spatial Explanation",
fontsize=13, fontweight="700", color=C_PRIMARY, pad=14)
ax.axis('off')
fig.text(0.5, 0.01,
"Brighter regions = higher accumulated detector confidence. "
"Generated from confidence-weighted Gaussian kernel density estimation.",
ha="center", fontsize=7, color=C_ACCENT)
path = os.path.join(out_dir, "heatmap.pdf")
fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none")
plt.close(fig)
return "heatmap.pdf"
else:
path = os.path.join(out_dir, "heatmap.png")
cv2.imwrite(path, overlay)
return "heatmap.png"
def generate_all(data, model_classes, out_dir, report_format="png"):
os.makedirs(out_dir, exist_ok=True)
plt.rcParams.update({
"font.family": "sans-serif",
"font.sans-serif": ["DejaVu Sans", "Arial", "Helvetica"],
"axes.unicode_minus": False,
})
total_in = sum(data["class_in"].values())
total_out = sum(data["class_out"].values())
fmt = report_format
video_path = data.get("video_path")
heatmap_points = data.get("heatmap_points", [])
raw_events = data.get("raw_events", [])
tasks = [
lambda: direction_pie(total_in, total_out, out_dir, fmt),
lambda: flow_histogram(data.get("flow_times", []), out_dir, fmt),
lambda: congestion_chart(data.get("congestion", []), out_dir, fmt),
lambda: class_dominance(data["class_in"], data["class_out"], model_classes, out_dir, fmt),
lambda: confidence_dist(data.get("conf_scores", []), out_dir, fmt),
lambda: spatial_heatmap(heatmap_points, video_path, out_dir, fmt),
]
if data.get("export_csv", False):
tasks.append(lambda: export_csv(raw_events, out_dir))
if data.get("export_json", False):
tasks.append(lambda: export_json(
data,
data.get("video_meta", {}),
data.get("engine_config", {}),
out_dir,
))
files = []
for fn in tasks:
name = fn()
if name:
files.append(name)
return files