# ============================================================ # 🚦 Stage 3 – Wrong-Direction Detection # (Angle + Temporal + Zone-Aware + Entry Gating + Confidence) # ============================================================ import gradio as gr import numpy as np, cv2, json, os, tempfile from collections import defaultdict # ------------------------------------------------------------ # ⚙️ CONFIG # ------------------------------------------------------------ ANGLE_THRESHOLD = 60 # degrees → above this = WRONG SMOOTH_FRAMES = 5 # frames for temporal smoothing ENTRY_ZONE_RATIO = 0.15 # top 15% = entry region (skip) CONF_MIN, CONF_MAX = 0, 100 # ------------------------------------------------------------ # 1️⃣ Load flow model (Stage 2) # ------------------------------------------------------------ def load_flow_model(flow_model_json): model = json.load(open(flow_model_json)) centers = [np.array(z) for z in model["zone_flow_centers"]] return centers # ------------------------------------------------------------ # 2️⃣ Extract trajectories # ------------------------------------------------------------ def extract_trajectories(json_file): data = json.load(open(json_file)) tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2} return tracks # ------------------------------------------------------------ # 3️⃣ Smoothed direction for a trajectory # ------------------------------------------------------------ def smooth_direction(pts, window=SMOOTH_FRAMES): if len(pts) < 2: return np.array([0, 0]) diffs = np.diff(pts[-window:], axis=0) v = np.mean(diffs, axis=0) n = np.linalg.norm(v) return v / (n + 1e-6) # ------------------------------------------------------------ # 4️⃣ Compute angular difference (deg) # ------------------------------------------------------------ def angle_between(v1, v2): v1 = v1 / (np.linalg.norm(v1) + 1e-6) v2 = v2 / (np.linalg.norm(v2) + 1e-6) cosang = np.clip(np.dot(v1, v2), -1, 1) return np.degrees(np.arccos(cosang)) # ------------------------------------------------------------ # 5️⃣ Determine zone index for y # ------------------------------------------------------------ def get_zone_idx(y, frame_h, n_zones): zone_height = frame_h / n_zones return int(np.clip(y // zone_height, 0, n_zones - 1)) # ------------------------------------------------------------ # 6️⃣ Confidence mapping # ------------------------------------------------------------ def angle_to_confidence(angle): """ 0° → 100% confidence ANGLE_THRESHOLD° → 50% 180° → 0% """ if angle < 0: return CONF_MIN if angle >= 180: return CONF_MIN # linear mapping: smaller angle = higher confidence conf = max(CONF_MIN, CONF_MAX - (angle / 180) * 100) return round(conf, 1) # ------------------------------------------------------------ # 7️⃣ Main logic # ------------------------------------------------------------ def classify_wrong_direction(traj_json, flow_model_json, bg_img=None): tracks = extract_trajectories(traj_json) centers_by_zone = load_flow_model(flow_model_json) if bg_img and os.path.exists(bg_img): bg = cv2.imread(bg_img) else: bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 h, w = bg.shape[:2] overlay = bg.copy() font = cv2.FONT_HERSHEY_SIMPLEX results = [] for tid, pts in tracks.items(): if len(pts) < 3: continue cur_pt = pts[-1] y = cur_pt[1] zone_idx = get_zone_idx(y, h, len(centers_by_zone)) # Skip entry region if y < h * ENTRY_ZONE_RATIO: continue v = smooth_direction(pts) centers = centers_by_zone[zone_idx] angles = [angle_between(v, c) for c in centers] best_angle = min(angles) # Confidence & label conf = angle_to_confidence(best_angle) label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG" color = (0, 255, 0) if label == "OK" else (0, 0, 255) # Draw trajectory & label for p1, p2 in zip(pts[:-1], pts[1:]): cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2) cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1) cv2.putText( overlay, f"ID:{tid} {label} ({conf}%)", (int(cur_pt[0]) + 5, int(cur_pt[1]) - 5), font, 0.6, color, 2 ) results.append({ "id": tid, "zone": int(zone_idx), "angle": round(best_angle, 1), "confidence": conf, "label": label }) combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name cv2.imwrite(out_path, combined) return out_path, results # ------------------------------------------------------------ # 🖥️ Gradio Interface # ------------------------------------------------------------ description_text = """ ### 🚦 Wrong-Direction Detection (Stage 3 — with Confidence) - Compares each vehicle’s motion to its zone’s dominant flow. - Uses angular difference → smaller angle ⇒ higher confidence. - Ignores entry region to avoid false positives. - Displays ID, label, and confidence percentage. """ demo = gr.Interface( fn=classify_wrong_direction, inputs=[ gr.File(label="Trajectories JSON (Stage 1)"), gr.File(label="Flow Model JSON (Stage 2)"), gr.File(label="Optional background frame (.jpg)") ], outputs=[ gr.Image(label="Annotated Output"), gr.JSON(label="Per-Vehicle Results") ], title="🚗 Stage 3 — Wrong-Direction Detection (with Confidence)", description=description_text ) if __name__ == "__main__": demo.launch()