File size: 6,097 Bytes
13bfea5
b5ff74a
13bfea5
 
 
 
 
b5ff74a
9453af9
 
13bfea5
888a4bc
b5ff74a
 
 
13bfea5
b5ff74a
888a4bc
 
b5ff74a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9453af9
13bfea5
b5ff74a
13bfea5
 
9453af9
 
b5ff74a
9453af9
13bfea5
b5ff74a
13bfea5
 
9453af9
 
b5ff74a
9453af9
13bfea5
b5ff74a
13bfea5
 
b5ff74a
888a4bc
13bfea5
b5ff74a
 
 
13bfea5
888a4bc
13bfea5
b5ff74a
 
 
 
 
 
 
 
888a4bc
13bfea5
b5ff74a
13bfea5
b5ff74a
13bfea5
 
 
b5ff74a
 
 
 
 
 
 
 
 
13bfea5
b5ff74a
 
 
 
 
 
 
 
 
13bfea5
b5ff74a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9453af9
13bfea5
 
 
9453af9
b5ff74a
 
 
 
 
9453af9
 
 
b5ff74a
888a4bc
13bfea5
 
b5ff74a
888a4bc
b5ff74a
 
13bfea5
9453af9
 
 
13bfea5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
# ============================================================
# ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output Version)
# ============================================================

import gradio as gr
import numpy as np, cv2, json, os, tempfile
from collections import defaultdict
import math

# ------------------------------------------------------------
# โš™๏ธ CONFIG
# ------------------------------------------------------------
ANGLE_THRESHOLD = 60        # deg โ†’ above = WRONG
SMOOTH_FRAMES   = 5         # temporal smoothing
ENTRY_ZONE_RATIO = 0.15     # skip top 15 %
CONF_MIN, CONF_MAX = 0, 100
FPS = 25                    # output video fps

# ------------------------------------------------------------
# ๐Ÿ”ง Helper โ€“ universal loader for Gradio inputs
# ------------------------------------------------------------
def load_json_input(file_obj):
    if file_obj is None:
        raise ValueError("No file provided.")
    if isinstance(file_obj, dict) and "name" in file_obj:
        path = file_obj["name"]
        return json.load(open(path))
    elif hasattr(file_obj, "name"):
        return json.load(open(file_obj.name))
    elif isinstance(file_obj, str):
        return json.load(open(file_obj))
    else:
        raise ValueError("Unsupported file input type.")

# ------------------------------------------------------------
# ๐Ÿงฉ Load Stage 2 flow model
# ------------------------------------------------------------
def load_flow_model(flow_model_json):
    model = load_json_input(flow_model_json)
    centers = [np.array(z) for z in model["zone_flow_centers"]]
    return centers

# ------------------------------------------------------------
# ๐Ÿงฉ Extract trajectories (Stage 1)
# ------------------------------------------------------------
def extract_trajectories(json_file):
    data = load_json_input(json_file)
    tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
    return tracks

# ------------------------------------------------------------
# ๐Ÿงฎ Direction + Angle + Confidence Helpers
# ------------------------------------------------------------
def smooth_direction(pts, window=SMOOTH_FRAMES):
    if len(pts) < 2: return np.array([0,0])
    diffs = np.diff(pts[-window:], axis=0)
    v = np.mean(diffs, axis=0)
    return v / (np.linalg.norm(v)+1e-6)

def angle_between(v1, v2):
    v1 = v1 / (np.linalg.norm(v1)+1e-6)
    v2 = v2 / (np.linalg.norm(v2)+1e-6)
    cosang = np.clip(np.dot(v1,v2), -1,1)
    return np.degrees(np.arccos(cosang))

def angle_to_confidence(angle):
    if angle<0: return CONF_MIN
    if angle>=180: return CONF_MIN
    conf = max(CONF_MIN, CONF_MAX - (angle/180)*100)
    return round(conf,1)

def get_zone_idx(y, frame_h, n_zones):
    zone_h = frame_h/n_zones
    return int(np.clip(y//zone_h, 0, n_zones-1))

# ------------------------------------------------------------
# ๐ŸŽฅ Main logic โ†’ annotated video
# ------------------------------------------------------------
def classify_wrong_direction_video(traj_json, flow_model_json, bg_img=None):
    tracks = extract_trajectories(traj_json)
    centers_by_zone = load_flow_model(flow_model_json)

    # background size
    if bg_img:
        if isinstance(bg_img, dict) and "name" in bg_img:
            bg_path = bg_img["name"]
        elif hasattr(bg_img,"name"):
            bg_path = bg_img.name
        else:
            bg_path = bg_img
        bg = cv2.imread(bg_path)
    else:
        bg = np.ones((600,900,3),dtype=np.uint8)*40
    if bg is None: bg = np.ones((600,900,3),dtype=np.uint8)*40
    h,w = bg.shape[:2]

    # infer video length from longest track
    max_len = max(len(p) for p in tracks.values())
    out_path = tempfile.NamedTemporaryFile(suffix=".mp4", delete=False).name
    fourcc = cv2.VideoWriter_fourcc(*'mp4v')
    writer = cv2.VideoWriter(out_path, fourcc, FPS, (w,h))
    font = cv2.FONT_HERSHEY_SIMPLEX

    # render frame-by-frame
    for fi in range(max_len):
        frame = bg.copy()
        for tid, pts in tracks.items():
            if fi >= len(pts): continue
            cur_pt = pts[fi]
            y = cur_pt[1]
            zone_idx = get_zone_idx(y, h, len(centers_by_zone))
            if y < h*ENTRY_ZONE_RATIO: continue

            # smooth direction using past window
            win = pts[max(0,fi-SMOOTH_FRAMES):fi+1]
            v = smooth_direction(win)
            centers = centers_by_zone[zone_idx]
            angles = [angle_between(v,c) for c in centers]
            best_angle = min(angles)
            conf = angle_to_confidence(best_angle)
            label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
            color = (0,255,0) if label=="OK" else (0,0,255)

            # draw trajectory so far
            for p1,p2 in zip(pts[:fi], pts[1:fi+1]):
                cv2.line(frame, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
            cv2.circle(frame, tuple(cur_pt.astype(int)), 5, color, -1)
            cv2.putText(frame, f"ID:{tid} {label} ({conf}%)",
                        (int(cur_pt[0])+5, int(cur_pt[1])-5),
                        font, 0.55, color, 2)

        writer.write(frame)

    writer.release()
    return out_path

# ------------------------------------------------------------
# ๐Ÿ–ฅ๏ธ Gradio Interface
# ------------------------------------------------------------
description_text = """
### ๐Ÿšฆ Stage 3 โ€“ Wrong-Direction Detection (Video Output)
Uses **trajectories (Stage 1)** + **flow model (Stage 2)** to create an annotated MP4:  
- Angle-based + temporal smoothing + zone awareness  
- Entry-zone gating  
- Confidence (%) per vehicle
"""

demo = gr.Interface(
    fn=classify_wrong_direction_video,
    inputs=[
        gr.File(label="Trajectories JSON (Stage 1)"),
        gr.File(label="Flow Model JSON (Stage 2)"),
        gr.File(label="Optional background frame (.jpg/.png)")
    ],
    outputs=gr.Video(label="Annotated Video Output"),
    title="๐Ÿš— Stage 3 โ€” Wrong-Direction Detection (Video Output)",
    description=description_text
)

if __name__ == "__main__":
    demo.launch()