import os import numpy as np import cv2 import csv import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import MaxNLocator from export_json import export_json # Formal MIS palette C_PRIMARY = "#1e293b" C_ACCENT = "#334155" C_IN = "#059669" C_OUT = "#dc2626" C_FLOW = "#2563eb" C_CONG = "#d97706" C_CONF = "#7c3aed" C_BAR = "#0f766e" C_GRID = "#e2e8f0" C_BG = "#ffffff" def _style(ax, title, xlabel="", ylabel=""): ax.set_title(title, fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) if xlabel: ax.set_xlabel(xlabel, fontsize=9, fontweight="600", color=C_ACCENT) if ylabel: ax.set_ylabel(ylabel, fontsize=9, fontweight="600", color=C_ACCENT) ax.tick_params(labelsize=8, colors=C_ACCENT) ax.spines["top"].set_visible(False) ax.spines["right"].set_visible(False) ax.spines["left"].set_color(C_GRID) ax.spines["bottom"].set_color(C_GRID) ax.yaxis.grid(True, color=C_GRID, linewidth=0.6, alpha=0.8) ax.set_axisbelow(True) def _save(fig, path, fmt="png"): if fmt == "pdf": path = path.rsplit(".", 1)[0] + ".pdf" fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") plt.close(fig) def direction_pie(total_in, total_out, out_dir, fmt="png"): if total_in + total_out == 0: return None fig, ax = plt.subplots(figsize=(5, 5), facecolor=C_BG) wedges, texts, autotexts = ax.pie( [total_in, total_out], labels=[f"Incoming ({total_in})", f"Outgoing ({total_out})"], autopct="%1.1f%%", startangle=90, colors=[C_IN, C_OUT], wedgeprops={"edgecolor": C_BG, "linewidth": 2.5 if (total_in > 0 and total_out > 0) else 0}, textprops={"fontsize": 10, "fontweight": "600", "color": C_PRIMARY}, ) for t in autotexts: t.set_fontsize(11) t.set_fontweight("700") t.set_color(C_BG) ax.set_title("Directional Split", fontsize=13, fontweight="700", color=C_PRIMARY, pad=16) total = total_in + total_out ax.text(0, -1.35, f"Total: {total} vehicles", ha="center", fontsize=9, color=C_ACCENT, fontweight="500") path = os.path.join(out_dir, "direction_pie.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"direction_pie.{ext}" def flow_histogram(flow_times, out_dir, fmt="png"): if not flow_times: return None fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) bins = min(30, max(5, len(set(flow_times)))) counts, edges, patches = ax.hist(flow_times, bins=bins, color=C_FLOW, alpha=0.85, edgecolor=C_BG, linewidth=0.8) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Traffic Flow Over Time", "Time (seconds)", "Vehicles Crossed") peak_idx = int(np.argmax(counts)) peak_time = (edges[peak_idx] + edges[peak_idx + 1]) / 2 ax.text(0.98, 0.95, f"Peak: {int(counts[peak_idx])} vehicles at {peak_time:.1f}s", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "flow_over_time.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"flow_over_time.{ext}" def congestion_chart(congestion, out_dir, fmt="png"): if not congestion: return None fig, ax = plt.subplots(figsize=(10, 4), facecolor=C_BG) x = range(len(congestion)) ax.fill_between(x, congestion, alpha=0.08, color=C_CONG) ax.plot(x, congestion, alpha=0.25, color=C_CONG, linewidth=0.5) win = min(30, max(3, len(congestion) // 10)) smooth = np.convolve(congestion, np.ones(win) / win, mode="same") ax.plot(x, smooth, linewidth=2, color=C_CONG) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Congestion Index", "Frame", "Active Vehicles") avg = np.mean(congestion) peak = max(congestion) ax.axhline(avg, color=C_ACCENT, linewidth=0.8, linestyle="--", alpha=0.5) ax.text(0.98, 0.95, f"Peak: {peak} | Avg: {avg:.1f}", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "congestion_index.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"congestion_index.{ext}" def class_dominance(class_in, class_out, model_classes, out_dir, fmt="png"): totals = {} for k in set(list(class_in.keys()) + list(class_out.keys())): totals[k] = class_in.get(k, 0) + class_out.get(k, 0) if not totals or sum(totals.values()) == 0: return None sorted_items = sorted(totals.items(), key=lambda x: x[1], reverse=True) classes = [model_classes.get(int(i), f"cls_{i}") for i, _ in sorted_items] values = [v for _, v in sorted_items] fig, ax = plt.subplots(figsize=(10, 4.5), facecolor=C_BG) n = len(classes) bar_width = min(0.45, max(0.15, 0.6 / max(n, 1))) bars = ax.bar(range(n), values, width=bar_width, color=C_BAR, edgecolor=C_BG, linewidth=0.5, zorder=3) ax.set_xticks(range(n)) ax.set_xticklabels(classes, rotation=35, ha="right", fontsize=9, fontweight="500") ax.yaxis.set_major_locator(MaxNLocator(integer=True)) for bar, v in zip(bars, values): ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.15, str(v), ha="center", va="bottom", fontsize=9, fontweight="700", color=C_PRIMARY) _style(ax, "Class Dominance", "", "Vehicle Count") total = sum(values) ax.text(0.98, 0.95, f"Total: {total} vehicles | {n} classes detected", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "class_dominance.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"class_dominance.{ext}" def confidence_dist(conf_scores, out_dir, fmt="png"): if not conf_scores: return None fig, ax = plt.subplots(figsize=(9, 4), facecolor=C_BG) ax.hist(conf_scores, bins=30, color=C_CONF, alpha=0.85, edgecolor=C_BG, linewidth=0.8) ax.yaxis.set_major_locator(MaxNLocator(integer=True)) _style(ax, "Detection Confidence Distribution", "Confidence Score", "Detections") mean_c = np.mean(conf_scores) median_c = np.median(conf_scores) ax.axvline(mean_c, color=C_PRIMARY, linewidth=1, linestyle="--", alpha=0.6) ax.text(0.98, 0.95, f"Mean: {mean_c:.3f} | Median: {median_c:.3f} | N={len(conf_scores)}", transform=ax.transAxes, ha="right", va="top", fontsize=8, color=C_ACCENT, bbox=dict(boxstyle="round,pad=0.4", facecolor="#f8fafc", edgecolor=C_GRID)) path = os.path.join(out_dir, "confidence_dist.png") _save(fig, path, fmt) ext = fmt if fmt == "pdf" else "png" return f"confidence_dist.{ext}" def export_csv(raw_events, out_dir): if not raw_events or len(raw_events) <= 1: return None path = os.path.join(out_dir, "raw_data.csv") with open(path, mode="w", newline="") as f: writer = csv.writer(f) writer.writerows(raw_events) return "raw_data.csv" def spatial_heatmap(heatmap_points, video_path, out_dir, fmt="png"): """ Confidence-Weighted Spatial Density Map (xAI / Explainability Overlay). Each detection contributes a Gaussian kernel to the accumulation grid, weighted by the model's own confidence score for that detection. This means the heatmap directly encodes WHERE the model is most certain vehicles exist — making it a faithful spatial explanation of the detector's attention, without requiring backpropagation. This is distinct from Grad-CAM (which needs a differentiable classifier) and is the correct xAI approach for a post-processing YOLO/OpenVINO deployment where gradients are not available at runtime. Algorithm: 1. For each detection (cx, cy, conf): stamp a 2D Gaussian kernel of radius proportional to frame size, weighted by conf. 2. Accumulate all weighted kernels into a float32 density grid. 3. Apply a mild additional Gaussian blur for visual smoothness. 4. Normalize [0, 255], apply COLORMAP_JET. 5. Blend over the original frame only where density > threshold. 6. Annotate with a legend showing the confidence scale. """ if not heatmap_points or not video_path or not os.path.exists(video_path): return None cap = cv2.VideoCapture(video_path) ret, frame = cap.read() cap.release() if not ret: return None h, w = frame.shape[:2] density = np.zeros((h, w), dtype=np.float32) # Kernel radius: ~3% of the shorter dimension, min 20px kernel_r = max(20, int(min(h, w) * 0.03)) kernel_size = kernel_r * 2 + 1 # Pre-build a unit Gaussian kernel to stamp for each detection _kx = cv2.getGaussianKernel(kernel_size, kernel_r / 2.5) _unit_kernel = (_kx @ _kx.T).astype(np.float32) # shape (ks, ks) for pt in heatmap_points: cx, cy = int(pt[0]), int(pt[1]) # Support both old [cx, cy] and new [cx, cy, conf] formats conf = float(pt[2]) if len(pt) > 2 else 1.0 # Kernel bounding box (clip to frame) x0 = max(0, cx - kernel_r) y0 = max(0, cy - kernel_r) x1 = min(w, cx + kernel_r + 1) y1 = min(h, cy + kernel_r + 1) # Corresponding slice in the kernel kx0 = x0 - (cx - kernel_r) ky0 = y0 - (cy - kernel_r) kx1 = kx0 + (x1 - x0) ky1 = ky0 + (y1 - y0) if x1 > x0 and y1 > y0 and kx1 > kx0 and ky1 > ky0: density[y0:y1, x0:x1] += conf * _unit_kernel[ky0:ky1, kx0:kx1] # Mild additional smoothing pass density = cv2.GaussianBlur(density, (31, 31), 0) max_val = np.max(density) if max_val <= 0: return None density_norm = (density / max_val * 255.0).astype(np.uint8) heatmap_color = cv2.applyColorMap(density_norm, cv2.COLORMAP_JET) # Blend: only paint where density is meaningful (>4% of max) threshold = int(0.04 * 255) mask = density_norm > threshold overlay = frame.copy() # Smooth alpha blend using the density as alpha weight alpha_map = (density_norm.astype(np.float32) / 255.0) * 0.72 alpha_map = np.clip(alpha_map, 0, 0.72) for c in range(3): overlay[:, :, c] = np.where( mask, (1.0 - alpha_map) * frame[:, :, c] + alpha_map * heatmap_color[:, :, c], frame[:, :, c] ).astype(np.uint8) # ── xAI legend bar ────────────────────────────────────────────────────── # Draw a horizontal colorbar with labels in the bottom-left corner bar_w, bar_h = min(240, w // 4), 14 bar_x, bar_y = 16, h - bar_h - 36 gradient = np.tile(np.arange(256, dtype=np.uint8), (bar_h, 1)) gradient_color = cv2.applyColorMap(gradient, cv2.COLORMAP_JET) # (bar_h, 256, 3) gradient_resized = cv2.resize(gradient_color, (bar_w, bar_h)) # Semi-transparent background panel behind the legend panel_pad = 10 panel = overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18, bar_x - panel_pad : bar_x + bar_w + panel_pad] if panel.size > 0: dark = np.full_like(panel, 15) overlay[bar_y - panel_pad : bar_y + bar_h + panel_pad + 18, bar_x - panel_pad : bar_x + bar_w + panel_pad] = cv2.addWeighted(panel, 0.35, dark, 0.65, 0) overlay[bar_y : bar_y + bar_h, bar_x : bar_x + bar_w] = gradient_resized font = cv2.FONT_HERSHEY_SIMPLEX font_s = 0.35 thickness = 1 label_color = (220, 220, 220) cv2.putText(overlay, "Low Confidence", (bar_x, bar_y + bar_h + 14), font, font_s, label_color, thickness, cv2.LINE_AA) high_label = "High Confidence" (tw, _), _ = cv2.getTextSize(high_label, font, font_s, thickness) cv2.putText(overlay, high_label, (bar_x + bar_w - tw, bar_y + bar_h + 14), font, font_s, label_color, thickness, cv2.LINE_AA) # Title label above the bar title_label = "Detection Confidence Density (xAI)" (ttw, _), _ = cv2.getTextSize(title_label, font, 0.38, thickness) cv2.putText(overlay, title_label, (bar_x, bar_y - panel_pad + 8), font, 0.38, (180, 180, 180), thickness, cv2.LINE_AA) # ── end legend ────────────────────────────────────────────────────────── if fmt == "pdf": overlay_rgb = cv2.cvtColor(overlay, cv2.COLOR_BGR2RGB) fig, ax = plt.subplots(figsize=(12, 7), facecolor=C_BG) ax.imshow(overlay_rgb) ax.set_title("Detection Confidence Density Map · xAI Spatial Explanation", fontsize=13, fontweight="700", color=C_PRIMARY, pad=14) ax.axis('off') fig.text(0.5, 0.01, "Brighter regions = higher accumulated detector confidence. " "Generated from confidence-weighted Gaussian kernel density estimation.", ha="center", fontsize=7, color=C_ACCENT) path = os.path.join(out_dir, "heatmap.pdf") fig.savefig(path, dpi=200, bbox_inches="tight", facecolor=C_BG, edgecolor="none") plt.close(fig) return "heatmap.pdf" else: path = os.path.join(out_dir, "heatmap.png") cv2.imwrite(path, overlay) return "heatmap.png" def generate_all(data, model_classes, out_dir, report_format="png"): os.makedirs(out_dir, exist_ok=True) plt.rcParams.update({ "font.family": "sans-serif", "font.sans-serif": ["DejaVu Sans", "Arial", "Helvetica"], "axes.unicode_minus": False, }) total_in = sum(data["class_in"].values()) total_out = sum(data["class_out"].values()) fmt = report_format video_path = data.get("video_path") heatmap_points = data.get("heatmap_points", []) raw_events = data.get("raw_events", []) tasks = [ lambda: direction_pie(total_in, total_out, out_dir, fmt), lambda: flow_histogram(data.get("flow_times", []), out_dir, fmt), lambda: congestion_chart(data.get("congestion", []), out_dir, fmt), lambda: class_dominance(data["class_in"], data["class_out"], model_classes, out_dir, fmt), lambda: confidence_dist(data.get("conf_scores", []), out_dir, fmt), lambda: spatial_heatmap(heatmap_points, video_path, out_dir, fmt), ] if data.get("export_csv", False): tasks.append(lambda: export_csv(raw_events, out_dir)) if data.get("export_json", False): tasks.append(lambda: export_json( data, data.get("video_meta", {}), data.get("engine_config", {}), out_dir, )) files = [] for fn in tasks: name = fn() if name: files.append(name) return files