Stage 3 (Video Output Version)

#10
by nishanth-saka - opened
Files changed (1) hide show
  1. app.py +102 -108
app.py CHANGED
@@ -1,165 +1,159 @@
1
  # ============================================================
2
- # ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection
3
- # (Angle + Temporal + Zone-Aware + Entry Gating + Confidence)
4
  # ============================================================
5
 
6
  import gradio as gr
7
  import numpy as np, cv2, json, os, tempfile
8
  from collections import defaultdict
 
9
 
10
  # ------------------------------------------------------------
11
  # โš™๏ธ CONFIG
12
  # ------------------------------------------------------------
13
- ANGLE_THRESHOLD = 60 # degrees โ†’ above this = WRONG
14
- SMOOTH_FRAMES = 5 # frames for temporal smoothing
15
- ENTRY_ZONE_RATIO = 0.15 # top 15% = entry region (skip)
16
  CONF_MIN, CONF_MAX = 0, 100
 
17
 
18
  # ------------------------------------------------------------
19
- # 1๏ธโƒฃ Load flow model (Stage 2)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
20
  # ------------------------------------------------------------
21
  def load_flow_model(flow_model_json):
22
- model = json.load(open(flow_model_json))
23
  centers = [np.array(z) for z in model["zone_flow_centers"]]
24
  return centers
25
 
26
  # ------------------------------------------------------------
27
- # 2๏ธโƒฃ Extract trajectories
28
  # ------------------------------------------------------------
29
  def extract_trajectories(json_file):
30
- data = json.load(open(json_file))
31
  tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
32
  return tracks
33
 
34
  # ------------------------------------------------------------
35
- # 3๏ธโƒฃ Smoothed direction for a trajectory
36
  # ------------------------------------------------------------
37
  def smooth_direction(pts, window=SMOOTH_FRAMES):
38
- if len(pts) < 2:
39
- return np.array([0, 0])
40
  diffs = np.diff(pts[-window:], axis=0)
41
  v = np.mean(diffs, axis=0)
42
- n = np.linalg.norm(v)
43
- return v / (n + 1e-6)
44
 
45
- # ------------------------------------------------------------
46
- # 4๏ธโƒฃ Compute angular difference (deg)
47
- # ------------------------------------------------------------
48
  def angle_between(v1, v2):
49
- v1 = v1 / (np.linalg.norm(v1) + 1e-6)
50
- v2 = v2 / (np.linalg.norm(v2) + 1e-6)
51
- cosang = np.clip(np.dot(v1, v2), -1, 1)
52
  return np.degrees(np.arccos(cosang))
53
 
54
- # ------------------------------------------------------------
55
- # 5๏ธโƒฃ Determine zone index for y
56
- # ------------------------------------------------------------
57
- def get_zone_idx(y, frame_h, n_zones):
58
- zone_height = frame_h / n_zones
59
- return int(np.clip(y // zone_height, 0, n_zones - 1))
60
-
61
- # ------------------------------------------------------------
62
- # 6๏ธโƒฃ Confidence mapping
63
- # ------------------------------------------------------------
64
  def angle_to_confidence(angle):
65
- """
66
- 0ยฐ โ†’ 100% confidence
67
- ANGLE_THRESHOLDยฐ โ†’ 50%
68
- 180ยฐ โ†’ 0%
69
- """
70
- if angle < 0:
71
- return CONF_MIN
72
- if angle >= 180:
73
- return CONF_MIN
74
- # linear mapping: smaller angle = higher confidence
75
- conf = max(CONF_MIN, CONF_MAX - (angle / 180) * 100)
76
- return round(conf, 1)
77
 
78
  # ------------------------------------------------------------
79
- # 7๏ธโƒฃ Main logic
80
  # ------------------------------------------------------------
81
- def classify_wrong_direction(traj_json, flow_model_json, bg_img=None):
82
  tracks = extract_trajectories(traj_json)
83
  centers_by_zone = load_flow_model(flow_model_json)
84
 
85
- if bg_img and os.path.exists(bg_img):
86
- bg = cv2.imread(bg_img)
 
 
 
 
 
 
 
87
  else:
88
- bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
89
- h, w = bg.shape[:2]
90
-
91
- overlay = bg.copy()
 
 
 
 
 
92
  font = cv2.FONT_HERSHEY_SIMPLEX
93
- results = []
94
-
95
- for tid, pts in tracks.items():
96
- if len(pts) < 3:
97
- continue
98
- cur_pt = pts[-1]
99
- y = cur_pt[1]
100
- zone_idx = get_zone_idx(y, h, len(centers_by_zone))
101
-
102
- # Skip entry region
103
- if y < h * ENTRY_ZONE_RATIO:
104
- continue
105
-
106
- v = smooth_direction(pts)
107
- centers = centers_by_zone[zone_idx]
108
- angles = [angle_between(v, c) for c in centers]
109
- best_angle = min(angles)
110
-
111
- # Confidence & label
112
- conf = angle_to_confidence(best_angle)
113
- label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
114
- color = (0, 255, 0) if label == "OK" else (0, 0, 255)
115
-
116
- # Draw trajectory & label
117
- for p1, p2 in zip(pts[:-1], pts[1:]):
118
- cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
119
- cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1)
120
- cv2.putText(
121
- overlay,
122
- f"ID:{tid} {label} ({conf}%)",
123
- (int(cur_pt[0]) + 5, int(cur_pt[1]) - 5),
124
- font, 0.6, color, 2
125
- )
126
-
127
- results.append({
128
- "id": tid,
129
- "zone": int(zone_idx),
130
- "angle": round(best_angle, 1),
131
- "confidence": conf,
132
- "label": label
133
- })
134
-
135
- combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
136
- out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
137
- cv2.imwrite(out_path, combined)
138
- return out_path, results
139
 
140
  # ------------------------------------------------------------
141
  # ๐Ÿ–ฅ๏ธ Gradio Interface
142
  # ------------------------------------------------------------
143
  description_text = """
144
- ### ๐Ÿšฆ Wrong-Direction Detection (Stage 3 โ€” with Confidence)
145
- - Compares each vehicleโ€™s motion to its zoneโ€™s dominant flow.
146
- - Uses angular difference โ†’ smaller angle โ‡’ higher confidence.
147
- - Ignores entry region to avoid false positives.
148
- - Displays ID, label, and confidence percentage.
149
  """
150
 
151
  demo = gr.Interface(
152
- fn=classify_wrong_direction,
153
  inputs=[
154
  gr.File(label="Trajectories JSON (Stage 1)"),
155
  gr.File(label="Flow Model JSON (Stage 2)"),
156
- gr.File(label="Optional background frame (.jpg)")
157
- ],
158
- outputs=[
159
- gr.Image(label="Annotated Output"),
160
- gr.JSON(label="Per-Vehicle Results")
161
  ],
162
- title="๐Ÿš— Stage 3 โ€” Wrong-Direction Detection (with Confidence)",
 
163
  description=description_text
164
  )
165
 
 
1
  # ============================================================
2
+ # ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output Version)
 
3
  # ============================================================
4
 
5
  import gradio as gr
6
  import numpy as np, cv2, json, os, tempfile
7
  from collections import defaultdict
8
+ import math
9
 
10
  # ------------------------------------------------------------
11
  # โš™๏ธ CONFIG
12
  # ------------------------------------------------------------
13
+ ANGLE_THRESHOLD = 60 # deg โ†’ above = WRONG
14
+ SMOOTH_FRAMES = 5 # temporal smoothing
15
+ ENTRY_ZONE_RATIO = 0.15 # skip top 15 %
16
  CONF_MIN, CONF_MAX = 0, 100
17
+ FPS = 25 # output video fps
18
 
19
  # ------------------------------------------------------------
20
+ # ๐Ÿ”ง Helper โ€“ universal loader for Gradio inputs
21
+ # ------------------------------------------------------------
22
+ def load_json_input(file_obj):
23
+ if file_obj is None:
24
+ raise ValueError("No file provided.")
25
+ if isinstance(file_obj, dict) and "name" in file_obj:
26
+ path = file_obj["name"]
27
+ return json.load(open(path))
28
+ elif hasattr(file_obj, "name"):
29
+ return json.load(open(file_obj.name))
30
+ elif isinstance(file_obj, str):
31
+ return json.load(open(file_obj))
32
+ else:
33
+ raise ValueError("Unsupported file input type.")
34
+
35
+ # ------------------------------------------------------------
36
+ # ๐Ÿงฉ Load Stage 2 flow model
37
  # ------------------------------------------------------------
38
  def load_flow_model(flow_model_json):
39
+ model = load_json_input(flow_model_json)
40
  centers = [np.array(z) for z in model["zone_flow_centers"]]
41
  return centers
42
 
43
  # ------------------------------------------------------------
44
+ # ๐Ÿงฉ Extract trajectories (Stage 1)
45
  # ------------------------------------------------------------
46
  def extract_trajectories(json_file):
47
+ data = load_json_input(json_file)
48
  tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
49
  return tracks
50
 
51
  # ------------------------------------------------------------
52
+ # ๐Ÿงฎ Direction + Angle + Confidence Helpers
53
  # ------------------------------------------------------------
54
  def smooth_direction(pts, window=SMOOTH_FRAMES):
55
+ if len(pts) < 2: return np.array([0,0])
 
56
  diffs = np.diff(pts[-window:], axis=0)
57
  v = np.mean(diffs, axis=0)
58
+ return v / (np.linalg.norm(v)+1e-6)
 
59
 
 
 
 
60
  def angle_between(v1, v2):
61
+ v1 = v1 / (np.linalg.norm(v1)+1e-6)
62
+ v2 = v2 / (np.linalg.norm(v2)+1e-6)
63
+ cosang = np.clip(np.dot(v1,v2), -1,1)
64
  return np.degrees(np.arccos(cosang))
65
 
 
 
 
 
 
 
 
 
 
 
66
  def angle_to_confidence(angle):
67
+ if angle<0: return CONF_MIN
68
+ if angle>=180: return CONF_MIN
69
+ conf = max(CONF_MIN, CONF_MAX - (angle/180)*100)
70
+ return round(conf,1)
71
+
72
+ def get_zone_idx(y, frame_h, n_zones):
73
+ zone_h = frame_h/n_zones
74
+ return int(np.clip(y//zone_h, 0, n_zones-1))
 
 
 
 
75
 
76
  # ------------------------------------------------------------
77
+ # ๐ŸŽฅ Main logic โ†’ annotated video
78
  # ------------------------------------------------------------
79
+ def classify_wrong_direction_video(traj_json, flow_model_json, bg_img=None):
80
  tracks = extract_trajectories(traj_json)
81
  centers_by_zone = load_flow_model(flow_model_json)
82
 
83
+ # background size
84
+ if bg_img:
85
+ if isinstance(bg_img, dict) and "name" in bg_img:
86
+ bg_path = bg_img["name"]
87
+ elif hasattr(bg_img,"name"):
88
+ bg_path = bg_img.name
89
+ else:
90
+ bg_path = bg_img
91
+ bg = cv2.imread(bg_path)
92
  else:
93
+ bg = np.ones((600,900,3),dtype=np.uint8)*40
94
+ if bg is None: bg = np.ones((600,900,3),dtype=np.uint8)*40
95
+ h,w = bg.shape[:2]
96
+
97
+ # infer video length from longest track
98
+ max_len = max(len(p) for p in tracks.values())
99
+ out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
100
+ fourcc = cv2.VideoWriter_fourcc(*'mp4v')
101
+ writer = cv2.VideoWriter(out_path, fourcc, FPS, (w,h))
102
  font = cv2.FONT_HERSHEY_SIMPLEX
103
+
104
+ # render frame-by-frame
105
+ for fi in range(max_len):
106
+ frame = bg.copy()
107
+ for tid, pts in tracks.items():
108
+ if fi >= len(pts): continue
109
+ cur_pt = pts[fi]
110
+ y = cur_pt[1]
111
+ zone_idx = get_zone_idx(y, h, len(centers_by_zone))
112
+ if y < h*ENTRY_ZONE_RATIO: continue
113
+
114
+ # smooth direction using past window
115
+ win = pts[max(0,fi-SMOOTH_FRAMES):fi+1]
116
+ v = smooth_direction(win)
117
+ centers = centers_by_zone[zone_idx]
118
+ angles = [angle_between(v,c) for c in centers]
119
+ best_angle = min(angles)
120
+ conf = angle_to_confidence(best_angle)
121
+ label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
122
+ color = (0,255,0) if label=="OK" else (0,0,255)
123
+
124
+ # draw trajectory so far
125
+ for p1,p2 in zip(pts[:fi], pts[1:fi+1]):
126
+ cv2.line(frame, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
127
+ cv2.circle(frame, tuple(cur_pt.astype(int)), 5, color, -1)
128
+ cv2.putText(frame, f"ID:{tid} {label} ({conf}%)",
129
+ (int(cur_pt[0])+5, int(cur_pt[1])-5),
130
+ font, 0.55, color, 2)
131
+
132
+ writer.write(frame)
133
+
134
+ writer.release()
135
+ return out_path
 
 
 
 
 
 
 
 
 
 
 
 
 
136
 
137
  # ------------------------------------------------------------
138
  # ๐Ÿ–ฅ๏ธ Gradio Interface
139
  # ------------------------------------------------------------
140
  description_text = """
141
+ ### ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output)
142
+ Uses **trajectories (Stage 1)** + **flow model (Stage 2)** to create an annotated MP4:
143
+ - Angle-based + temporal smoothing + zone awareness
144
+ - Entry-zone gating
145
+ - Confidence (%) per vehicle
146
  """
147
 
148
  demo = gr.Interface(
149
+ fn=classify_wrong_direction_video,
150
  inputs=[
151
  gr.File(label="Trajectories JSON (Stage 1)"),
152
  gr.File(label="Flow Model JSON (Stage 2)"),
153
+ gr.File(label="Optional background frame (.jpg/.png)")
 
 
 
 
154
  ],
155
+ outputs=gr.Video(label="Annotated Video Output"),
156
+ title="๐Ÿš— Stage 3 โ€” Wrong-Direction Detection (Video Output)",
157
  description=description_text
158
  )
159