File size: 7,996 Bytes
0f26f4e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
import numpy as np

def detect_phases(depth_series):
    phases = []
    max_idx = np.argmax(depth_series)
    for i in range(len(depth_series)):
        if i < max_idx - 2:
            phases.append("down")
        elif i <= max_idx + 2:
            phases.append("bottom")
        else:
            phases.append("up")
    return phases


class Rule:
    def __init__(self, name, severity):
        self.name = name
        self.severity = severity
        self.description = ""

    def check(self, frame, context=None):
        return None


# === FRAME-LEVEL RULES (kiểm tra từng frame) ===

class HipSagRule(Rule):
    """So sánh hip_angle với mentor. Chỉ bắt lỗi khi lệch > 15° dưới mức mentor."""
    def __init__(self):
        super().__init__("hip_sag", "high")
        self.description = "Lưng bị võng xuống"

    def check(self, frame, context=None):
        mentor_avg = context.get("mentor_hip_angle", 165.0) if context else 165.0
        # Widen from 15 to 22 degrees below mentor to prevent false positives for different camera angles
        if frame["hip_angle"] < mentor_avg - 22:
            return {
                "type": self.name, "message": self.description,
                "frame_idx": frame["frame_idx"], "severity": self.severity
            }
        return None


class PikeRule(Rule):
    """Phát hiện mông nhô cao (pike) bằng depth_sig.
    depth_sig = shoulder_y - hip_y. Dương = vai thấp hơn hông = mông cao.
    So sánh trung bình depth_sig của học viên với mentor."""
    def __init__(self):
        super().__init__("hip_pike", "high")
        self.description = "Nhô mông quá cao"

    def check(self, frame, context=None):
        # Không check per-frame nữa, dùng check_rep
        return None

    def check_rep(self, frames, context=None):
        mentor_depth_sig = context.get("mentor_depth_sig_mean", -0.015) if context else -0.015
        
        student_depth_sigs = [f["depth_sig"] for f in frames]
        student_mean = np.mean(student_depth_sigs)
        
        # Nếu depth_sig trung bình của học viên dương hơn mentor > 0.03 → mông cao
        # Template: ~-0.015, Mông cao: ~+0.04, Sai biệt: ~0.055
        if student_mean > mentor_depth_sig + 0.03:
            worst = max(frames, key=lambda f: f["depth_sig"])
            return {
                "type": self.name, "message": self.description,
                "severity": self.severity, "frames": [worst["frame_idx"]]
            }
        return None


class BodyAlignmentRule(Rule):
    """So sánh body_line_angle với mentor. Chỉ bắt khi body_line thấp hơn mentor
    VÀ depth_sig âm (loại bỏ trường hợp mông cao đã bắt bởi PikeRule)."""
    def __init__(self):
        super().__init__("body_not_straight", "high")
        self.description = "Cơ thể không giữ thẳng (Vai-Hông-Gót)"

    def check(self, frame, context=None):
        mentor_avg = context.get("mentor_body_line_angle", 172.0) if context else 172.0
        # Chỉ bắt khi body_line thấp hơn mentor > 20° VÀ depth_sig âm (không phải pike)
        if frame["body_line_angle"] < mentor_avg - 20 and frame["depth_sig"] < 0:
            return {
                "type": self.name, "message": self.description,
                "frame_idx": frame["frame_idx"], "severity": self.severity
            }
        return None


# === REP-LEVEL RULES (kiểm tra toàn bộ rep 1 lần) ===

class DepthRule(Rule):
    """So sánh MIN elbow angle trong toàn bộ rep với mentor's min.
    Không check per-frame nữa để tránh bắt frame đang xuống dở."""
    def __init__(self):
        super().__init__("not_deep_enough", "high")
        self.description = "Chưa hạ người đủ sâu"

    def check_rep(self, frames, context=None):
        mentor_min_elbow = context.get("mentor_min_elbow", 75.0) if context else 75.0
        student_min_elbow = min(f["elbow_angle"] for f in frames)
        # Cho phép lệch 45° so với mentor (tính đến ROM khác nhau và camera)
        if student_min_elbow > mentor_min_elbow + 45:
            worst = min(frames, key=lambda f: f["elbow_angle"])
            return {
                "type": self.name, "message": self.description,
                "severity": self.severity, "frames": [worst["frame_idx"]]
            }
        return None


class HeadMisalignedRule(Rule):
    """Phát hiện cúi đầu bằng cách kiểm tra khoảng cách rơi của đầu so với vai."""
    def __init__(self):
        super().__init__("head_misaligned", "medium")
        self.description = "Gập cổ hoặc cúi đầu quá mức"

    def check_rep(self, frames, context=None):
        # Mặc định max_drop của mentor khoảng 0.085, cộng thêm biên độ sai số (threshold)
        mentor_max_drop = context.get("mentor_max_head_drop", 0.085) if context else 0.085
        
        student_drops = [f.get("head_drop_norm", 0) for f in frames]
        max_drop = max(student_drops)

        # Nếu đầu gập/rơi xuống quá sâu so với biên độ của mentor (thêm 0.02 để tránh false positive)
        # Các rep tập đúng của học viên thường có max_drop < 0.05
        # Các rep cúi đầu sẽ có max_drop từ 0.08 đến 0.13
        if max_drop > mentor_max_drop + 0.02:
            worst = max(frames, key=lambda f: f.get("head_drop_norm", 0))
            return {
                "type": self.name, "message": self.description,
                "severity": self.severity, "frames": [worst["frame_idx"]]
            }
        return None


class PushUpRuleEngine:
    def __init__(self, mentor_context=None):
        self.mentor_context = mentor_context or {}
        self.frame_rules = [
            HipSagRule(),
            BodyAlignmentRule(),
        ]
        self.rep_rules = [
            DepthRule(),
            HeadMisalignedRule(),
            PikeRule(),
        ]

    def evaluate_rep(self, frames):
        depth_series = [f["depth_sig"] * 100 for f in frames]
        phases = detect_phases(depth_series)
        for i, f in enumerate(frames):
            f["phase"] = phases[i]

        # Frame-level rules → aggregate
        frame_errors = []
        for f in frames:
            for rule in self.frame_rules:
                res = rule.check(f, context=self.mentor_context)
                if res:
                    frame_errors.append(res)

        results = self.aggregate(frame_errors, total_frames=len(frames))

        # Rep-level rules → add directly (đã tự aggregate)
        for rule in self.rep_rules:
            res = rule.check_rep(frames, context=self.mentor_context)
            if res:
                results.append(res)

        return results

    def aggregate(self, errors, total_frames):
        grouped = {}
        for e in errors:
            if e["type"] not in grouped:
                grouped[e["type"]] = {
                    "message": e["message"], "severity": e["severity"], "frames": []
                }
            grouped[e["type"]]["frames"].append(e["frame_idx"])

        results = []
        for err_type, data in grouped.items():
            threshold_ratio = 0.2
            if len(data["frames"]) / total_frames > threshold_ratio:
                step = max(1, len(data["frames"]) // 3)
                sample_frames = data["frames"][::step][:3]
                results.append({
                    "type": err_type, "message": data["message"],
                    "severity": data["severity"], "frames": sample_frames
                })
        return results

    def calculate_score(self, aggregated_errors):
        score = 1.0
        for err in aggregated_errors:
            if err["severity"] == "high":
                score -= 0.20
            elif err["severity"] == "medium":
                score -= 0.10
            elif err["severity"] == "low":
                score -= 0.05
        return max(0.0, score)