nishanth-saka's picture
Stage 3 (Video Output Version)
b5ff74a verified
raw
history blame
6.1 kB
# ============================================================
# ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output Version)
# ============================================================
import gradio as gr
import numpy as np, cv2, json, os, tempfile
from collections import defaultdict
import math
# ------------------------------------------------------------
# โš™๏ธ CONFIG
# ------------------------------------------------------------
ANGLE_THRESHOLD = 60 # deg โ†’ above = WRONG
SMOOTH_FRAMES = 5 # temporal smoothing
ENTRY_ZONE_RATIO = 0.15 # skip top 15 %
CONF_MIN, CONF_MAX = 0, 100
FPS = 25 # output video fps
# ------------------------------------------------------------
# ๐Ÿ”ง Helper โ€“ universal loader for Gradio inputs
# ------------------------------------------------------------
def load_json_input(file_obj):
if file_obj is None:
raise ValueError("No file provided.")
if isinstance(file_obj, dict) and "name" in file_obj:
path = file_obj["name"]
return json.load(open(path))
elif hasattr(file_obj, "name"):
return json.load(open(file_obj.name))
elif isinstance(file_obj, str):
return json.load(open(file_obj))
else:
raise ValueError("Unsupported file input type.")
# ------------------------------------------------------------
# ๐Ÿงฉ Load Stage 2 flow model
# ------------------------------------------------------------
def load_flow_model(flow_model_json):
model = load_json_input(flow_model_json)
centers = [np.array(z) for z in model["zone_flow_centers"]]
return centers
# ------------------------------------------------------------
# ๐Ÿงฉ Extract trajectories (Stage 1)
# ------------------------------------------------------------
def extract_trajectories(json_file):
data = load_json_input(json_file)
tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
return tracks
# ------------------------------------------------------------
# ๐Ÿงฎ Direction + Angle + Confidence Helpers
# ------------------------------------------------------------
def smooth_direction(pts, window=SMOOTH_FRAMES):
if len(pts) < 2: return np.array([0,0])
diffs = np.diff(pts[-window:], axis=0)
v = np.mean(diffs, axis=0)
return v / (np.linalg.norm(v)+1e-6)
def angle_between(v1, v2):
v1 = v1 / (np.linalg.norm(v1)+1e-6)
v2 = v2 / (np.linalg.norm(v2)+1e-6)
cosang = np.clip(np.dot(v1,v2), -1,1)
return np.degrees(np.arccos(cosang))
def angle_to_confidence(angle):
if angle<0: return CONF_MIN
if angle>=180: return CONF_MIN
conf = max(CONF_MIN, CONF_MAX - (angle/180)*100)
return round(conf,1)
def get_zone_idx(y, frame_h, n_zones):
zone_h = frame_h/n_zones
return int(np.clip(y//zone_h, 0, n_zones-1))
# ------------------------------------------------------------
# ๐ŸŽฅ Main logic โ†’ annotated video
# ------------------------------------------------------------
def classify_wrong_direction_video(traj_json, flow_model_json, bg_img=None):
tracks = extract_trajectories(traj_json)
centers_by_zone = load_flow_model(flow_model_json)
# background size
if bg_img:
if isinstance(bg_img, dict) and "name" in bg_img:
bg_path = bg_img["name"]
elif hasattr(bg_img,"name"):
bg_path = bg_img.name
else:
bg_path = bg_img
bg = cv2.imread(bg_path)
else:
bg = np.ones((600,900,3),dtype=np.uint8)*40
if bg is None: bg = np.ones((600,900,3),dtype=np.uint8)*40
h,w = bg.shape[:2]
# infer video length from longest track
max_len = max(len(p) for p in tracks.values())
out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
writer = cv2.VideoWriter(out_path, fourcc, FPS, (w,h))
font = cv2.FONT_HERSHEY_SIMPLEX
# render frame-by-frame
for fi in range(max_len):
frame = bg.copy()
for tid, pts in tracks.items():
if fi >= len(pts): continue
cur_pt = pts[fi]
y = cur_pt[1]
zone_idx = get_zone_idx(y, h, len(centers_by_zone))
if y < h*ENTRY_ZONE_RATIO: continue
# smooth direction using past window
win = pts[max(0,fi-SMOOTH_FRAMES):fi+1]
v = smooth_direction(win)
centers = centers_by_zone[zone_idx]
angles = [angle_between(v,c) for c in centers]
best_angle = min(angles)
conf = angle_to_confidence(best_angle)
label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
color = (0,255,0) if label=="OK" else (0,0,255)
# draw trajectory so far
for p1,p2 in zip(pts[:fi], pts[1:fi+1]):
cv2.line(frame, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
cv2.circle(frame, tuple(cur_pt.astype(int)), 5, color, -1)
cv2.putText(frame, f"ID:{tid} {label} ({conf}%)",
(int(cur_pt[0])+5, int(cur_pt[1])-5),
font, 0.55, color, 2)
writer.write(frame)
writer.release()
return out_path
# ------------------------------------------------------------
# ๐Ÿ–ฅ๏ธ Gradio Interface
# ------------------------------------------------------------
description_text = """
### ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output)
Uses **trajectories (Stage 1)** + **flow model (Stage 2)** to create an annotated MP4:
- Angle-based + temporal smoothing + zone awareness
- Entry-zone gating
- Confidence (%) per vehicle
"""
demo = gr.Interface(
fn=classify_wrong_direction_video,
inputs=[
gr.File(label="Trajectories JSON (Stage 1)"),
gr.File(label="Flow Model JSON (Stage 2)"),
gr.File(label="Optional background frame (.jpg/.png)")
],
outputs=gr.Video(label="Annotated Video Output"),
title="๐Ÿš— Stage 3 โ€” Wrong-Direction Detection (Video Output)",
description=description_text
)
if __name__ == "__main__":
demo.launch()