nishanth-saka's picture
Stage 3 (Angle + Temporal + Zone-Aware + Entry Gating)
e02542b verified
raw
history blame
5.93 kB
# ============================================================
# 🚦 Stage 3 – Wrong-Direction Detection
# (Angle + Temporal + Zone-Aware + Entry Gating + Confidence)
# ============================================================
import gradio as gr
import numpy as np, cv2, json, os, tempfile
from collections import defaultdict
# ------------------------------------------------------------
# ⚙️ CONFIG
# ------------------------------------------------------------
ANGLE_THRESHOLD = 60 # degrees → above this = WRONG
SMOOTH_FRAMES = 5 # frames for temporal smoothing
ENTRY_ZONE_RATIO = 0.15 # top 15% = entry region (skip)
CONF_MIN, CONF_MAX = 0, 100
# ------------------------------------------------------------
# 1️⃣ Load flow model (Stage 2)
# ------------------------------------------------------------
def load_flow_model(flow_model_json):
model = json.load(open(flow_model_json))
centers = [np.array(z) for z in model["zone_flow_centers"]]
return centers
# ------------------------------------------------------------
# 2️⃣ Extract trajectories
# ------------------------------------------------------------
def extract_trajectories(json_file):
data = json.load(open(json_file))
tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
return tracks
# ------------------------------------------------------------
# 3️⃣ Smoothed direction for a trajectory
# ------------------------------------------------------------
def smooth_direction(pts, window=SMOOTH_FRAMES):
if len(pts) < 2:
return np.array([0, 0])
diffs = np.diff(pts[-window:], axis=0)
v = np.mean(diffs, axis=0)
n = np.linalg.norm(v)
return v / (n + 1e-6)
# ------------------------------------------------------------
# 4️⃣ Compute angular difference (deg)
# ------------------------------------------------------------
def angle_between(v1, v2):
v1 = v1 / (np.linalg.norm(v1) + 1e-6)
v2 = v2 / (np.linalg.norm(v2) + 1e-6)
cosang = np.clip(np.dot(v1, v2), -1, 1)
return np.degrees(np.arccos(cosang))
# ------------------------------------------------------------
# 5️⃣ Determine zone index for y
# ------------------------------------------------------------
def get_zone_idx(y, frame_h, n_zones):
zone_height = frame_h / n_zones
return int(np.clip(y // zone_height, 0, n_zones - 1))
# ------------------------------------------------------------
# 6️⃣ Confidence mapping
# ------------------------------------------------------------
def angle_to_confidence(angle):
"""
0° → 100% confidence
ANGLE_THRESHOLD° → 50%
180° → 0%
"""
if angle < 0:
return CONF_MIN
if angle >= 180:
return CONF_MIN
# linear mapping: smaller angle = higher confidence
conf = max(CONF_MIN, CONF_MAX - (angle / 180) * 100)
return round(conf, 1)
# ------------------------------------------------------------
# 7️⃣ Main logic
# ------------------------------------------------------------
def classify_wrong_direction(traj_json, flow_model_json, bg_img=None):
tracks = extract_trajectories(traj_json)
centers_by_zone = load_flow_model(flow_model_json)
if bg_img and os.path.exists(bg_img):
bg = cv2.imread(bg_img)
else:
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
h, w = bg.shape[:2]
overlay = bg.copy()
font = cv2.FONT_HERSHEY_SIMPLEX
results = []
for tid, pts in tracks.items():
if len(pts) < 3:
continue
cur_pt = pts[-1]
y = cur_pt[1]
zone_idx = get_zone_idx(y, h, len(centers_by_zone))
# Skip entry region
if y < h * ENTRY_ZONE_RATIO:
continue
v = smooth_direction(pts)
centers = centers_by_zone[zone_idx]
angles = [angle_between(v, c) for c in centers]
best_angle = min(angles)
# Confidence & label
conf = angle_to_confidence(best_angle)
label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
color = (0, 255, 0) if label == "OK" else (0, 0, 255)
# Draw trajectory & label
for p1, p2 in zip(pts[:-1], pts[1:]):
cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1)
cv2.putText(
overlay,
f"ID:{tid} {label} ({conf}%)",
(int(cur_pt[0]) + 5, int(cur_pt[1]) - 5),
font, 0.6, color, 2
)
results.append({
"id": tid,
"zone": int(zone_idx),
"angle": round(best_angle, 1),
"confidence": conf,
"label": label
})
combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
cv2.imwrite(out_path, combined)
return out_path, results
# ------------------------------------------------------------
# 🖥️ Gradio Interface
# ------------------------------------------------------------
description_text = """
### 🚦 Wrong-Direction Detection (Stage 3 — with Confidence)
- Compares each vehicle’s motion to its zone’s dominant flow.
- Uses angular difference → smaller angle ⇒ higher confidence.
- Ignores entry region to avoid false positives.
- Displays ID, label, and confidence percentage.
"""
demo = gr.Interface(
fn=classify_wrong_direction,
inputs=[
gr.File(label="Trajectories JSON (Stage 1)"),
gr.File(label="Flow Model JSON (Stage 2)"),
gr.File(label="Optional background frame (.jpg)")
],
outputs=[
gr.Image(label="Annotated Output"),
gr.JSON(label="Per-Vehicle Results")
],
title="🚗 Stage 3 — Wrong-Direction Detection (with Confidence)",
description=description_text
)
if __name__ == "__main__":
demo.launch()