|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gradio as gr |
|
|
import numpy as np, cv2, json, os, tempfile |
|
|
from collections import defaultdict |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ANGLE_THRESHOLD = 60 |
|
|
SMOOTH_FRAMES = 5 |
|
|
ENTRY_ZONE_RATIO = 0.15 |
|
|
CONF_MIN, CONF_MAX = 0, 100 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def load_flow_model(flow_model_json): |
|
|
model = json.load(open(flow_model_json)) |
|
|
centers = [np.array(z) for z in model["zone_flow_centers"]] |
|
|
return centers |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_trajectories(json_file): |
|
|
data = json.load(open(json_file)) |
|
|
tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2} |
|
|
return tracks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def smooth_direction(pts, window=SMOOTH_FRAMES): |
|
|
if len(pts) < 2: |
|
|
return np.array([0, 0]) |
|
|
diffs = np.diff(pts[-window:], axis=0) |
|
|
v = np.mean(diffs, axis=0) |
|
|
n = np.linalg.norm(v) |
|
|
return v / (n + 1e-6) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def angle_between(v1, v2): |
|
|
v1 = v1 / (np.linalg.norm(v1) + 1e-6) |
|
|
v2 = v2 / (np.linalg.norm(v2) + 1e-6) |
|
|
cosang = np.clip(np.dot(v1, v2), -1, 1) |
|
|
return np.degrees(np.arccos(cosang)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_zone_idx(y, frame_h, n_zones): |
|
|
zone_height = frame_h / n_zones |
|
|
return int(np.clip(y // zone_height, 0, n_zones - 1)) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def angle_to_confidence(angle): |
|
|
""" |
|
|
0° → 100% confidence |
|
|
ANGLE_THRESHOLD° → 50% |
|
|
180° → 0% |
|
|
""" |
|
|
if angle < 0: |
|
|
return CONF_MIN |
|
|
if angle >= 180: |
|
|
return CONF_MIN |
|
|
|
|
|
conf = max(CONF_MIN, CONF_MAX - (angle / 180) * 100) |
|
|
return round(conf, 1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def classify_wrong_direction(traj_json, flow_model_json, bg_img=None): |
|
|
tracks = extract_trajectories(traj_json) |
|
|
centers_by_zone = load_flow_model(flow_model_json) |
|
|
|
|
|
if bg_img and os.path.exists(bg_img): |
|
|
bg = cv2.imread(bg_img) |
|
|
else: |
|
|
bg = np.ones((600, 900, 3), dtype=np.uint8) * 40 |
|
|
h, w = bg.shape[:2] |
|
|
|
|
|
overlay = bg.copy() |
|
|
font = cv2.FONT_HERSHEY_SIMPLEX |
|
|
results = [] |
|
|
|
|
|
for tid, pts in tracks.items(): |
|
|
if len(pts) < 3: |
|
|
continue |
|
|
cur_pt = pts[-1] |
|
|
y = cur_pt[1] |
|
|
zone_idx = get_zone_idx(y, h, len(centers_by_zone)) |
|
|
|
|
|
|
|
|
if y < h * ENTRY_ZONE_RATIO: |
|
|
continue |
|
|
|
|
|
v = smooth_direction(pts) |
|
|
centers = centers_by_zone[zone_idx] |
|
|
angles = [angle_between(v, c) for c in centers] |
|
|
best_angle = min(angles) |
|
|
|
|
|
|
|
|
conf = angle_to_confidence(best_angle) |
|
|
label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG" |
|
|
color = (0, 255, 0) if label == "OK" else (0, 0, 255) |
|
|
|
|
|
|
|
|
for p1, p2 in zip(pts[:-1], pts[1:]): |
|
|
cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2) |
|
|
cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1) |
|
|
cv2.putText( |
|
|
overlay, |
|
|
f"ID:{tid} {label} ({conf}%)", |
|
|
(int(cur_pt[0]) + 5, int(cur_pt[1]) - 5), |
|
|
font, 0.6, color, 2 |
|
|
) |
|
|
|
|
|
results.append({ |
|
|
"id": tid, |
|
|
"zone": int(zone_idx), |
|
|
"angle": round(best_angle, 1), |
|
|
"confidence": conf, |
|
|
"label": label |
|
|
}) |
|
|
|
|
|
combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0) |
|
|
out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name |
|
|
cv2.imwrite(out_path, combined) |
|
|
return out_path, results |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
description_text = """ |
|
|
### 🚦 Wrong-Direction Detection (Stage 3 — with Confidence) |
|
|
- Compares each vehicle’s motion to its zone’s dominant flow. |
|
|
- Uses angular difference → smaller angle ⇒ higher confidence. |
|
|
- Ignores entry region to avoid false positives. |
|
|
- Displays ID, label, and confidence percentage. |
|
|
""" |
|
|
|
|
|
demo = gr.Interface( |
|
|
fn=classify_wrong_direction, |
|
|
inputs=[ |
|
|
gr.File(label="Trajectories JSON (Stage 1)"), |
|
|
gr.File(label="Flow Model JSON (Stage 2)"), |
|
|
gr.File(label="Optional background frame (.jpg)") |
|
|
], |
|
|
outputs=[ |
|
|
gr.Image(label="Annotated Output"), |
|
|
gr.JSON(label="Per-Vehicle Results") |
|
|
], |
|
|
title="🚗 Stage 3 — Wrong-Direction Detection (with Confidence)", |
|
|
description=description_text |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|