File size: 5,932 Bytes
e02542b
 
 
 
 
 
 
 
9453af9
 
e02542b
888a4bc
e02542b
 
 
 
888a4bc
 
e02542b
9453af9
e02542b
 
 
 
9453af9
 
e02542b
9453af9
e02542b
 
 
 
9453af9
 
e02542b
9453af9
e02542b
 
 
 
 
 
 
888a4bc
 
e02542b
888a4bc
e02542b
 
 
 
 
888a4bc
9453af9
e02542b
 
 
 
 
9453af9
 
e02542b
9453af9
e02542b
 
 
 
 
 
 
 
 
 
 
 
 
888a4bc
e02542b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9453af9
e02542b
 
 
9453af9
e02542b
 
 
 
 
9453af9
 
 
e02542b
888a4bc
e02542b
 
 
888a4bc
 
e02542b
 
888a4bc
e02542b
 
9453af9
 
 
e02542b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# ============================================================
# 🚦 Stage 3 – Wrong-Direction Detection
#     (Angle + Temporal + Zone-Aware + Entry Gating + Confidence)
# ============================================================

import gradio as gr
import numpy as np, cv2, json, os, tempfile
from collections import defaultdict

# ------------------------------------------------------------
# ⚙️ CONFIG
# ------------------------------------------------------------
ANGLE_THRESHOLD = 60        # degrees → above this = WRONG
SMOOTH_FRAMES = 5           # frames for temporal smoothing
ENTRY_ZONE_RATIO = 0.15     # top 15% = entry region (skip)
CONF_MIN, CONF_MAX = 0, 100

# ------------------------------------------------------------
# 1️⃣ Load flow model (Stage 2)
# ------------------------------------------------------------
def load_flow_model(flow_model_json):
    model = json.load(open(flow_model_json))
    centers = [np.array(z) for z in model["zone_flow_centers"]]
    return centers

# ------------------------------------------------------------
# 2️⃣ Extract trajectories
# ------------------------------------------------------------
def extract_trajectories(json_file):
    data = json.load(open(json_file))
    tracks = {tid: np.array(pts) for tid, pts in data.items() if len(pts) > 2}
    return tracks

# ------------------------------------------------------------
# 3️⃣ Smoothed direction for a trajectory
# ------------------------------------------------------------
def smooth_direction(pts, window=SMOOTH_FRAMES):
    if len(pts) < 2:
        return np.array([0, 0])
    diffs = np.diff(pts[-window:], axis=0)
    v = np.mean(diffs, axis=0)
    n = np.linalg.norm(v)
    return v / (n + 1e-6)

# ------------------------------------------------------------
# 4️⃣ Compute angular difference (deg)
# ------------------------------------------------------------
def angle_between(v1, v2):
    v1 = v1 / (np.linalg.norm(v1) + 1e-6)
    v2 = v2 / (np.linalg.norm(v2) + 1e-6)
    cosang = np.clip(np.dot(v1, v2), -1, 1)
    return np.degrees(np.arccos(cosang))

# ------------------------------------------------------------
# 5️⃣ Determine zone index for y
# ------------------------------------------------------------
def get_zone_idx(y, frame_h, n_zones):
    zone_height = frame_h / n_zones
    return int(np.clip(y // zone_height, 0, n_zones - 1))

# ------------------------------------------------------------
# 6️⃣ Confidence mapping
# ------------------------------------------------------------
def angle_to_confidence(angle):
    """
    0° → 100% confidence
    ANGLE_THRESHOLD° → 50%
    180° → 0%
    """
    if angle < 0:
        return CONF_MIN
    if angle >= 180:
        return CONF_MIN
    # linear mapping: smaller angle = higher confidence
    conf = max(CONF_MIN, CONF_MAX - (angle / 180) * 100)
    return round(conf, 1)

# ------------------------------------------------------------
# 7️⃣ Main logic
# ------------------------------------------------------------
def classify_wrong_direction(traj_json, flow_model_json, bg_img=None):
    tracks = extract_trajectories(traj_json)
    centers_by_zone = load_flow_model(flow_model_json)

    if bg_img and os.path.exists(bg_img):
        bg = cv2.imread(bg_img)
    else:
        bg = np.ones((600, 900, 3), dtype=np.uint8) * 40
    h, w = bg.shape[:2]

    overlay = bg.copy()
    font = cv2.FONT_HERSHEY_SIMPLEX
    results = []

    for tid, pts in tracks.items():
        if len(pts) < 3:
            continue
        cur_pt = pts[-1]
        y = cur_pt[1]
        zone_idx = get_zone_idx(y, h, len(centers_by_zone))

        # Skip entry region
        if y < h * ENTRY_ZONE_RATIO:
            continue

        v = smooth_direction(pts)
        centers = centers_by_zone[zone_idx]
        angles = [angle_between(v, c) for c in centers]
        best_angle = min(angles)

        # Confidence & label
        conf = angle_to_confidence(best_angle)
        label = "OK" if best_angle < ANGLE_THRESHOLD else "WRONG"
        color = (0, 255, 0) if label == "OK" else (0, 0, 255)

        # Draw trajectory & label
        for p1, p2 in zip(pts[:-1], pts[1:]):
            cv2.line(overlay, tuple(p1.astype(int)), tuple(p2.astype(int)), color, 2)
        cv2.circle(overlay, tuple(cur_pt.astype(int)), 5, color, -1)
        cv2.putText(
            overlay,
            f"ID:{tid} {label} ({conf}%)",
            (int(cur_pt[0]) + 5, int(cur_pt[1]) - 5),
            font, 0.6, color, 2
        )

        results.append({
            "id": tid,
            "zone": int(zone_idx),
            "angle": round(best_angle, 1),
            "confidence": conf,
            "label": label
        })

    combined = cv2.addWeighted(bg, 0.6, overlay, 0.4, 0)
    out_path = tempfile.NamedTemporaryFile(suffix=".jpg", delete=False).name
    cv2.imwrite(out_path, combined)
    return out_path, results

# ------------------------------------------------------------
# 🖥️ Gradio Interface
# ------------------------------------------------------------
description_text = """
### 🚦 Wrong-Direction Detection (Stage 3 — with Confidence)
- Compares each vehicle’s motion to its zone’s dominant flow.  
- Uses angular difference → smaller angle ⇒ higher confidence.  
- Ignores entry region to avoid false positives.  
- Displays ID, label, and confidence percentage.
"""

demo = gr.Interface(
    fn=classify_wrong_direction,
    inputs=[
        gr.File(label="Trajectories JSON (Stage 1)"),
        gr.File(label="Flow Model JSON (Stage 2)"),
        gr.File(label="Optional background frame (.jpg)")
    ],
    outputs=[
        gr.Image(label="Annotated Output"),
        gr.JSON(label="Per-Vehicle Results")
    ],
    title="🚗 Stage 3 — Wrong-Direction Detection (with Confidence)",
    description=description_text
)

if __name__ == "__main__":
    demo.launch()