| | |
| | |
| | |
| |
|
| | import gradio as gr |
| | import numpy as np, cv2, json, os, tempfile |
| | from collections import defaultdict |
| | import math |
| |
|
| | |
| | |
| | |
| | ANGLE_THRESHOLD = 60 |
| | SMOOTH_FRAMES = 5 |
| | ENTRY_ZONE_RATIO = 0.15 |
| | CONF_MIN, CONF_MAX = 0, 100 |
| | FPS = 25 |
| |
|
| | |
| | |
| | |
| | def load_json_input(file_obj): |
| | if file_obj is None: |
| | raise ValueError("No file provided.") |
| | if isinstance(file_obj, dict) and "name" in file_obj: |
| | path = file_obj["name"] |
| | return json.load(open(path)) |
| | elif hasattr(file_obj, "name"): |
| | return json.load(open(file_obj.name)) |
| | elif isinstance(file_obj, str): |
| | return json.load(open(file_obj)) |
| | else: |
| | raise ValueError("Unsupported file input type.") |
| |
|
| | |
| | |
| | |
| | def load_flow_model(flow_model_json): |
| | model = load_json_input(flow_model_json) |
| | centers = [np.array(z) for z in model["zone_flow_centers"]] |
| | return centers |
| |
|
| | |
| | |
| | |
| | def extract_trajectories(json_file): |
| | data = load_json_input(json_file) |
| | tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2} |
| | return tracks |
| |
|
| | |
| | |
| | |
| | def smooth_direction(pts, window=SMOOTH_FRAMES): |
| | if len(pts) < 2: return np.array([0,0]) |
| | diffs = np.diff(pts[-window:], axis=0) |
| | v = np.mean(diffs, axis=0) |
| | return v / (np.linalg.norm(v)+1e-6) |
| |
|
| | def angle_between(v1, v2): |
| | v1 = v1 / (np.linalg.norm(v1)+1e-6) |
| | v2 = v2 / (np.linalg.norm(v2)+1e-6) |
| | cosang = np.clip(np.dot(v1,v2), -1,1) |
| | return np.degrees(np.arccos(cosang)) |
| |
|
| | def angle_to_confidence(angle): |
| | if angle<0: return CONF_MIN |
| | if angle>=180: return CONF_MIN |
| | conf = max(CONF_MIN, CONF_MAX - (angle/180)*100) |
| | return round(conf,1) |
| |
|
| | def get_zone_idx(y, frame_h, n_zones): |
| | zone_h = frame_h/n_zones |
| | return int(np.clip(y//zone_h, 0, n_zones-1)) |
| |
|
| | |
| | |
| | |
| | def classify_wrong_direction_video(traj_json, flow_model_json, bg_img=None): |
| | tracks = extract_trajectories(traj_json) |
| | centers_by_zone = load_flow_model(flow_model_json) |
| |
|
| | |
| | if bg_img: |
| | if isinstance(bg_img, dict) and "name" in bg_img: |
| | bg_path = bg_img["name"] |
| | elif hasattr(bg_img,"name"): |
| | bg_path = bg_img.name |
| | else: |
| | bg_path = bg_img |
| | bg = cv2.imread(bg_path) |
| | else: |
| | bg = np.ones((600,900,3),dtype=np.uint8)*40 |
| | if bg is None: bg = np.ones((600,900,3),dtype=np.uint8)*40 |
| | h,w = bg.shape[:2] |
| |
|
| | |
| | max_len = max(len(p) for p in tracks.values()) |
| | out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name |
| | fourcc = cv2.VideoWriter_fourcc(*'mp4v') |
| | writer = cv2.VideoWriter(out_path, fourcc, FPS, (w,h)) |
| | font = cv2.FONT_HERSHEY_SIMPLEX |
| |
|
| | |
| | for fi in range(max_len): |
| | frame = bg.copy() |
| | for tid, pts in tracks.items(): |
| | if fi >= len(pts): continue |
| | cur_pt = pts[fi] |
| | y = cur_pt[1] |
| | zone_idx = get_zone_idx(y, h, len(centers_by_zone)) |
| | if y < h*ENTRY_ZONE_RATIO: continue |
| |
|
| | |
| | win = pts[max(0,fi-SMOOTH_FRAMES):fi+1] |
| | v = smooth_direction(win) |
| | centers = centers_by_zone[zone_idx] |
| | angles = [angle_between(v,c) for c in centers] |
| | best_angle = min(angles) |
| | conf = angle_to_confidence(best_angle) |
| | label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG" |
| | color = (0,255,0) if label=="OK" else (0,0,255) |
| |
|
| | |
| | for p1,p2 in zip(pts[:fi], pts[1:fi+1]): |
| | cv2.line(frame, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2) |
| | cv2.circle(frame, tuple(cur_pt.astype(int)), 5, color, -1) |
| | cv2.putText(frame, f"ID:{tid} {label} ({conf}%)", |
| | (int(cur_pt[0])+5, int(cur_pt[1])-5), |
| | font, 0.55, color, 2) |
| |
|
| | writer.write(frame) |
| |
|
| | writer.release() |
| | return out_path |
| |
|
| | |
| | |
| | |
| | description_text = """ |
| | ### ๐ฆ Stage 3 โ Wrong-Direction Detection (Video Output) |
| | Uses **trajectories (Stage 1)** + **flow model (Stage 2)** to create an annotated MP4: |
| | - Angle-based + temporal smoothing + zone awareness |
| | - Entry-zone gating |
| | - Confidence (%) per vehicle |
| | """ |
| |
|
| | demo = gr.Interface( |
| | fn=classify_wrong_direction_video, |
| | inputs=[ |
| | gr.File(label="Trajectories JSON (Stage 1)"), |
| | gr.File(label="Flow Model JSON (Stage 2)"), |
| | gr.File(label="Optional background frame (.jpg/.png)") |
| | ], |
| | outputs=gr.Video(label="Annotated Video Output"), |
| | title="๐ Stage 3 โ Wrong-Direction Detection (Video Output)", |
| | description=description_text |
| | ) |
| |
|
| | if __name__ == "__main__": |
| | demo.launch() |
| |
|