# ============================================================ # 🚦 Stage 3 – Wrong-Direction Detection (Video Output Version) # ============================================================ import gradio as gr import numpy as np, cv2, json, os, tempfile from collections import defaultdict import math # ------------------------------------------------------------ # ⚙️ CONFIG # ------------------------------------------------------------ ANGLE_THRESHOLD = 60 # deg → above = WRONG SMOOTH_FRAMES = 5 # temporal smoothing ENTRY_ZONE_RATIO = 0.15 # skip top 15 % CONF_MIN, CONF_MAX = 0, 100 FPS = 25 # output video fps # ------------------------------------------------------------ # 🔧 Helper – universal loader for Gradio inputs # ------------------------------------------------------------ def load_json_input(file_obj): if file_obj is None: raise ValueError("No file provided.") if isinstance(file_obj, dict) and "name" in file_obj: path = file_obj["name"] return json.load(open(path)) elif hasattr(file_obj, "name"): return json.load(open(file_obj.name)) elif isinstance(file_obj, str): return json.load(open(file_obj)) else: raise ValueError("Unsupported file input type.") # ------------------------------------------------------------ # 🧩 Load Stage 2 flow model # ------------------------------------------------------------ def load_flow_model(flow_model_json): model = load_json_input(flow_model_json) centers = [np.array(z) for z in model["zone_flow_centers"]] return centers # ------------------------------------------------------------ # 🧩 Extract trajectories (Stage 1) # ------------------------------------------------------------ def extract_trajectories(json_file): data = load_json_input(json_file) tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2} return tracks # ------------------------------------------------------------ # 🧮 Direction + Angle + Confidence Helpers # ------------------------------------------------------------ def smooth_direction(pts, window=SMOOTH_FRAMES): if len(pts) < 2: return np.array([0,0]) diffs = np.diff(pts[-window:], axis=0) v = np.mean(diffs, axis=0) return v / (np.linalg.norm(v)+1e-6) def angle_between(v1, v2): v1 = v1 / (np.linalg.norm(v1)+1e-6) v2 = v2 / (np.linalg.norm(v2)+1e-6) cosang = np.clip(np.dot(v1,v2), -1,1) return np.degrees(np.arccos(cosang)) def angle_to_confidence(angle): if angle<0: return CONF_MIN if angle>=180: return CONF_MIN conf = max(CONF_MIN, CONF_MAX - (angle/180)*100) return round(conf,1) def get_zone_idx(y, frame_h, n_zones): zone_h = frame_h/n_zones return int(np.clip(y//zone_h, 0, n_zones-1)) # ------------------------------------------------------------ # 🎥 Main logic → annotated video # ------------------------------------------------------------ def classify_wrong_direction_video(traj_json, flow_model_json, bg_img=None): tracks = extract_trajectories(traj_json) centers_by_zone = load_flow_model(flow_model_json) # background size if bg_img: if isinstance(bg_img, dict) and "name" in bg_img: bg_path = bg_img["name"] elif hasattr(bg_img,"name"): bg_path = bg_img.name else: bg_path = bg_img bg = cv2.imread(bg_path) else: bg = np.ones((600,900,3),dtype=np.uint8)*40 if bg is None: bg = np.ones((600,900,3),dtype=np.uint8)*40 h,w = bg.shape[:2] # infer video length from longest track max_len = max(len(p) for p in tracks.values()) out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name fourcc = cv2.VideoWriter_fourcc(*'mp4v') writer = cv2.VideoWriter(out_path, fourcc, FPS, (w,h)) font = cv2.FONT_HERSHEY_SIMPLEX # render frame-by-frame for fi in range(max_len): frame = bg.copy() for tid, pts in tracks.items(): if fi >= len(pts): continue cur_pt = pts[fi] y = cur_pt[1] zone_idx = get_zone_idx(y, h, len(centers_by_zone)) if y < h*ENTRY_ZONE_RATIO: continue # smooth direction using past window win = pts[max(0,fi-SMOOTH_FRAMES):fi+1] v = smooth_direction(win) centers = centers_by_zone[zone_idx] angles = [angle_between(v,c) for c in centers] best_angle = min(angles) conf = angle_to_confidence(best_angle) label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG" color = (0,255,0) if label=="OK" else (0,0,255) # draw trajectory so far for p1,p2 in zip(pts[:fi], pts[1:fi+1]): cv2.line(frame, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2) cv2.circle(frame, tuple(cur_pt.astype(int)), 5, color, -1) cv2.putText(frame, f"ID:{tid} {label} ({conf}%)", (int(cur_pt[0])+5, int(cur_pt[1])-5), font, 0.55, color, 2) writer.write(frame) writer.release() return out_path # ------------------------------------------------------------ # 🖥️ Gradio Interface # ------------------------------------------------------------ description_text = """ ### 🚦 Stage 3 – Wrong-Direction Detection (Video Output) Uses **trajectories (Stage 1)** + **flow model (Stage 2)** to create an annotated MP4: - Angle-based + temporal smoothing + zone awareness - Entry-zone gating - Confidence (%) per vehicle """ demo = gr.Interface( fn=classify_wrong_direction_video, inputs=[ gr.File(label="Trajectories JSON (Stage 1)"), gr.File(label="Flow Model JSON (Stage 2)"), gr.File(label="Optional background frame (.jpg/.png)") ], outputs=gr.Video(label="Annotated Video Output"), title="🚗 Stage 3 — Wrong-Direction Detection (Video Output)", description=description_text ) if __name__ == "__main__": demo.launch()