File size: 22,761 Bytes
83ee618
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
import cv2
import numpy as np
import os
from ultralytics import YOLO
from collections import deque

class EnhancedGaitAnalyzer:
    def __init__(self, model_name="yolo11n-pose.pt", age=4, independent_walking=True):
        """

        Enhanced Gait Analyzer with weight shifting, postural strength, AND original gait analysis.

        """
        self.model = YOLO(model_name)
        self.age = age
        self.independent_walking = independent_walking
        
        # COCO 17-Keypoint Mapping
        self.KEYPOINT_MAP = {
            0: "Nose", 1: "LEye", 2: "REye", 3: "LEar", 4: "REar",
            5: "LShoulder", 6: "RShoulder", 7: "LElbow", 8: "RElbow",
            9: "LWrist", 10: "RWrist", 11: "LHip", 12: "RHip",
            13: "LKnee", 14: "RKnee", 15: "LAnkle", 16: "RAnkle"
        }
        
        # Thresholds
        self.WEIGHT_SHIFT_THRESHOLD = 0.15
        self.HIP_DROP_THRESHOLD = 10
        self.POSTURE_SWAY_THRESHOLD = 20
        
    def process_video(self, video_path):
        """Process video and extract gait analysis metrics."""
        if not os.path.exists(video_path):
            raise FileNotFoundError(f"Video file not found: {video_path}")

        results = self.model(video_path, stream=True, verbose=False)
        frames_data = []
        
        for i, r in enumerate(results):
            if r.keypoints is None or r.keypoints.data.shape[0] == 0:
                frames_data.append({"frame": i, "keypoints": []})
                continue

            kpts = r.keypoints.data[0].cpu().numpy()
            points = []
            for kp in kpts:
                x, y, conf = kp
                if conf > 0.3:
                    points.append((float(x), float(y)))
                else:
                    points.append(None)

            frames_data.append({"frame": int(i), "keypoints": points})

        return self._make_serializable(self.analyze_gait(frames_data))

    def _make_serializable(self, obj):
        """Recursively convert numpy data types to native python types for JSON serialization."""
        if isinstance(obj, dict):
            return {k: self._make_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._make_serializable(v) for v in obj]
        elif isinstance(obj, tuple):
            return tuple(self._make_serializable(v) for v in obj)
        elif isinstance(obj, (np.integer, int)):
            return int(obj)
        elif isinstance(obj, (np.floating, float)):
            return float(obj)
        elif isinstance(obj, np.ndarray):
            return self._make_serializable(obj.tolist())
        else:
            return obj

    def calculate_angle(self, p1, p2, p3):
        """Calculate angle between three points."""
        if p1 is None or p2 is None or p3 is None:
            return None
            
        a = np.array(p1)
        b = np.array(p2)
        c = np.array(p3)
        
        ba = a - b
        bc = c - b
        
        norm_ba = np.linalg.norm(ba)
        norm_bc = np.linalg.norm(bc)
        
        if norm_ba == 0 or norm_bc == 0:
            return None
            
        cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc)
        angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
        
        return np.degrees(angle)

    def calculate_weight_distribution(self, left_ankle, right_ankle, left_hip, right_hip):
        """Calculate weight distribution based on hip and ankle positions."""
        if not all([left_ankle, right_ankle, left_hip, right_hip]):
            return None, None, None
        
        left_hip_y = left_hip[1]
        right_hip_y = right_hip[1]

        # In image coordinates, Y increases downward.
        # A higher hip (lower Y value) = stance/weight-bearing side.
        # So if left_hip_y < right_hip_y, the LEFT hip is higher → weight on LEFT.
        weight_on_left = (left_hip_y - right_hip_y)
        total_weight = abs(weight_on_left) + 1
        weight_ratio = (weight_on_left + total_weight) / (2 * total_weight)
        weight_ratio = np.clip(weight_ratio, 0, 1)

        imbalance_percent = abs(weight_ratio - 0.5) * 200

        if imbalance_percent < 10:
            primary_side = "balanced"
        elif weight_ratio > 0.5:
            primary_side = "left"
        else:
            primary_side = "right"
        
        return weight_ratio, primary_side, imbalance_percent

    def detect_hip_drop(self, left_hip, right_hip):
        """Detect Trendelenburg gait (hip drop on swing side)."""
        if not left_hip or not right_hip:
            return None, None
        
        hip_height_diff = abs(left_hip[1] - right_hip[1])
        
        if hip_height_diff > self.HIP_DROP_THRESHOLD:
            if left_hip[1] > right_hip[1]:
                return "left_hip_drop", "right_side_weakness"
            else:
                return "right_hip_drop", "left_side_weakness"
        
        return None, None

    def assess_postural_strength(self, frames_data):
        """Assess postural strength."""
        trunk_sway = []
        shoulder_heights = []
        head_forward_lean = []
        knee_stability = []
        
        for frame_data in frames_data:
            kps = frame_data.get("keypoints", [])
            if not kps or len(kps) < 17:
                continue
            
            if kps[5] and kps[6]:
                shoulder_center = (kps[5][0] + kps[6][0]) / 2
                trunk_sway.append(shoulder_center)
                shoulder_diff = abs(kps[5][1] - kps[6][1])
                shoulder_heights.append(shoulder_diff)
            
            if kps[0] and kps[5] and kps[6]:
                nose_to_shoulder = kps[0][0] - (kps[5][0] + kps[6][0]) / 2
                head_forward_lean.append(nose_to_shoulder)
            
            if "angles" in frame_data:
                lk = frame_data["angles"].get("left_knee")
                rk = frame_data["angles"].get("right_knee")
                if lk and rk:
                    if lk < 120 and rk < 120:
                        knee_stability.append("weak")
                    elif lk > 160 and rk > 160:
                        knee_stability.append("stiff")
                    else:
                        knee_stability.append("normal")
        
        return {
            "trunk_sway": np.std(trunk_sway) if trunk_sway else 0,
            "shoulder_hiking": np.mean(shoulder_heights) if shoulder_heights else 0,
            "forward_lean": np.mean(np.abs(head_forward_lean)) if head_forward_lean else 0,
            "knee_stability": knee_stability
        }

    def analyze_gait(self, frames_data):
        """Complete gait analysis with all features."""
        
        # Validation for visibility and person detection
        total_frames = len(frames_data)
        
        # Check minimum video length — need at least ~5 seconds (150 frames at 30fps)
        # for enough gait cycles to compute meaningful metrics.
        if total_frames < 150:
            raise ValueError("VIDEO_TOO_SHORT: Video is too short for meaningful analysis. Please provide a longer video.")
            
        valid_frames = 0
        for frame_data in frames_data:
            kps = frame_data.get("keypoints", [])
            # A valid frame has enough confident keypoints (at least 8 keypoints)
            if kps and len([kp for kp in kps if kp is not None]) >= 8:
                valid_frames += 1
                
        if valid_frames == 0:
            raise ValueError("NO_PERSON_DETECTED: No person could be detected in the video.")
            
        visibility_ratio = valid_frames / total_frames
        if visibility_ratio < 0.3:
            raise ValueError("POOR_VISIBILITY: Unable to detect clear movement in most frames. This may be due to bad video quality, loose clothing, bad recording angle, or the subject not being clearly visible.")

        analysis_results = {
            "frames": [],
            "summary": {},
            "weight_distribution": {
                "overall_imbalance_percent": 0,
                "primary_weight_side": "unknown",
                "weight_shift_issues": [],
                "hip_drop_analysis": []
            },
            "postural_strength": {
                "trunk_sway_severity": "normal",
                "shoulder_stability": "normal",
                "forward_lean": "normal",
                "knee_stability": "normal",
                "overall_strength_assessment": "adequate"
            },
            "gait_phases": {
                "stance_time_l": 0,
                "stance_time_r": 0,
                "swing_time_l": 0,
                "swing_time_r": 0,
                "double_support_frames": 0,
                "phase_asymmetry": 0
            },
            "risk_indicators": [],
            "flags": {},
            "observations": {
                "trunk": "Neutral",
                "head_neck": "Neutral & mobile",
                "arms_hands": "Free arm swing",
                "lower_limbs": "Not observed",
                "symmetry": "Symmetrical",
                "weight_distribution": "Equal",
                "postural_control": "Good"
            },
            "compensatory_strategies": [],
            "reflex_influence": "None observed",
            "safety_note": "",
            "clinical_notes": []
        }
        
        left_knee_angles = []
        right_knee_angles = []
        left_hip_angles = []
        right_hip_angles = []
        left_ankle_heights = []
        right_ankle_heights = []
        weight_distributions = []
        hip_drops = []
        trunk_rotations = []
        arm_heights = []
        nose_positions = []
        
        # First Pass: Calculate angles, weights, and collect gait phase data
        for frame_data in frames_data:
            kps = frame_data["keypoints"]
            if not kps or len(kps) < 17:
                analysis_results["frames"].append(frame_data)
                continue

            # Angles
            l_knee = self.calculate_angle(kps[11], kps[13], kps[15])
            r_knee = self.calculate_angle(kps[12], kps[14], kps[16])
            l_hip = self.calculate_angle(kps[5], kps[11], kps[13])
            r_hip = self.calculate_angle(kps[6], kps[12], kps[14])
            
            frame_data["angles"] = {
                "left_knee": l_knee,
                "right_knee": r_knee,
                "left_hip": l_hip,
                "right_hip": r_hip
            }
            
            if l_knee: left_knee_angles.append(l_knee)
            if r_knee: right_knee_angles.append(r_knee)
            if l_hip: left_hip_angles.append(l_hip)
            if r_hip: right_hip_angles.append(r_hip)
            
            # Weight Distribution Analysis
            weight_ratio, primary_side, imbalance = self.calculate_weight_distribution(
                kps[15], kps[16], kps[11], kps[12]
            )
            
            if weight_ratio is not None:
                weight_distributions.append({
                    "ratio": weight_ratio,
                    "primary_side": primary_side,
                    "imbalance_percent": imbalance
                })
                frame_data["weight_distribution"] = {
                    "ratio": weight_ratio,
                    "primary_side": primary_side,
                    "imbalance_percent": imbalance
                }
            
            # Hip Drop Detection
            hip_drop, weakness = self.detect_hip_drop(kps[11], kps[12])
            if hip_drop:
                hip_drops.append({"type": hip_drop, "weakness": weakness})
            
            # Trunk rotation
            if kps[5] and kps[6]:
                trunk_rotations.append(abs(kps[5][0] - kps[6][0]))
            
            # Arm height for guarding
            if kps[5] and kps[9]:
                arm_heights.append(kps[5][1] - kps[9][1])
            
            # Ankle heights for gait phase detection
            if kps[15]:
                left_ankle_heights.append(kps[15][1])
            else:
                left_ankle_heights.append(None)
                
            if kps[16]:
                right_ankle_heights.append(kps[16][1])
            else:
                right_ankle_heights.append(None)
            
            # Nose position for head stability
            if kps[0]:
                nose_positions.append(kps[0][1])
            
            analysis_results["frames"].append(frame_data)
        
        # Gait Phase Detection
        def detect_phases(heights):
            phases = []
            if not heights: return phases
            valid_heights = [h for h in heights if h is not None]
            if not valid_heights: return [None] * len(heights)
            ground_threshold = np.percentile(valid_heights, 80)
            for h in heights:
                if h is None: phases.append(None)
                elif h >= ground_threshold: phases.append("stance")
                else: phases.append("swing")
            return phases

        l_phases = detect_phases(left_ankle_heights)
        r_phases = detect_phases(right_ankle_heights)
        
        stance_time_l = l_phases.count("stance")
        swing_time_l = l_phases.count("swing")
        stance_time_r = r_phases.count("stance")
        swing_time_r = r_phases.count("swing")
        
        double_support_frames = 0
        for lp, rp in zip(l_phases, r_phases):
            if lp == "stance" and rp == "stance":
                double_support_frames += 1
        
        phase_asymmetry = 0
        if max(stance_time_l, stance_time_r) > 0:
            phase_asymmetry = (abs(stance_time_l - stance_time_r) / max(stance_time_l, stance_time_r)) * 100
        
        analysis_results["gait_phases"] = {
            "stance_time_l": stance_time_l,
            "stance_time_r": stance_time_r,
            "swing_time_l": swing_time_l,
            "swing_time_r": swing_time_r,
            "double_support_frames": double_support_frames,
            "phase_asymmetry": phase_asymmetry
        }
        
        # Postural Strength Assessment
        strength_assessment = self.assess_postural_strength(frames_data)
        
        # Analyze Weight Distribution
        overall_imbalance = 0
        primary_side = "balanced"
        
        if weight_distributions:
            imbalances = [w["imbalance_percent"] for w in weight_distributions]
            overall_imbalance = np.mean(imbalances)
            primary_sides = [w["primary_side"] for w in weight_distributions]
            primary_side = max(set([s for s in primary_sides if s != "balanced"]), 
                             key=primary_sides.count, default="balanced")
            
            analysis_results["weight_distribution"]["overall_imbalance_percent"] = overall_imbalance
            analysis_results["weight_distribution"]["primary_weight_side"] = primary_side
            
            if overall_imbalance > self.WEIGHT_SHIFT_THRESHOLD * 100:
                analysis_results["weight_distribution"]["weight_shift_issues"].append(
                    f"Significant weight shift ({overall_imbalance:.1f}% imbalance) - "
                    f"predominantly loading {primary_side} side"
                )
                analysis_results["observations"]["weight_distribution"] = f"Asymmetrical - favoring {primary_side}"
                analysis_results["clinical_notes"].append(
                    f"WEIGHT DISTRIBUTION: Child shows {overall_imbalance:.1f}% weight shift toward {primary_side} side. "
                    f"Suggests pain avoidance, weakness, or motor planning issue on opposite side."
                )
            
            if hip_drops:
                for drop in hip_drops:
                    analysis_results["weight_distribution"]["hip_drop_analysis"].append(drop)
                    analysis_results["clinical_notes"].append(
                        f"HIP DROP: {drop['type']} indicates {drop['weakness']} - "
                        f"weakness in hip abductors or gluteus medius"
                    )
        
        # Postural Strength Scoring
        trunk_sway = strength_assessment["trunk_sway"]
        if trunk_sway > self.POSTURE_SWAY_THRESHOLD:
            analysis_results["postural_strength"]["trunk_sway_severity"] = "excessive"
            analysis_results["observations"]["postural_control"] = "Weak - excessive sway"
            analysis_results["clinical_notes"].append(
                f"TRUNK SWAY: Significant lateral sway (std: {trunk_sway:.1f}px). "
                f"Suggests weak core stability."
            )
        
        shoulder_hike = strength_assessment["shoulder_hiking"]
        if shoulder_hike > 5:
            analysis_results["postural_strength"]["shoulder_stability"] = "hiking"
            analysis_results["compensatory_strategies"].append("Shoulder hiking for stability")
            analysis_results["clinical_notes"].append(
                f"SHOULDER HIKING: Upper trapezius compensation for core/trunk weakness."
            )
        
        forward_lean = strength_assessment["forward_lean"]
        if forward_lean > 15:
            analysis_results["postural_strength"]["forward_lean"] = "excessive"
            analysis_results["clinical_notes"].append(
                f"FORWARD HEAD POSTURE: Head forward {forward_lean:.1f}px. "
                f"Weak neck/shoulder stabilizers or balance compensation."
            )
        
        knee_issues = strength_assessment["knee_stability"]
        if knee_issues.count("weak") > len(knee_issues) * 0.5:
            analysis_results["postural_strength"]["knee_stability"] = "weak"
            analysis_results["clinical_notes"].append(
                f"KNEE WEAKNESS: Insufficient extension/flexion control. "
                f"Quad weakness or motor planning issues."
            )
        
        weakness_indicators = sum([
            trunk_sway > self.POSTURE_SWAY_THRESHOLD,
            shoulder_hike > 5,
            forward_lean > 15,
            knee_issues.count("weak") > len(knee_issues) * 0.5
        ])
        
        if weakness_indicators >= 2:
            analysis_results["postural_strength"]["overall_strength_assessment"] = "inadequate"
            analysis_results["observations"]["postural_control"] = "Floppy/weak - multiple issues"
            analysis_results["clinical_notes"].append(
                f"POSTURAL ASSESSMENT: Low muscle tone/weakness ({weakness_indicators} issues detected). "
                f"Recommend strength assessment."
            )
        
        # Original Observations Logic
        # Trunk
        if trunk_rotations:
            sway = np.std(trunk_rotations)
            if sway > 15: 
                analysis_results["observations"]["trunk"] = "Stiff / reduced rotation"
                analysis_results["compensatory_strategies"].append("Trunk used for balance")
            elif sway > 10: 
                analysis_results["observations"]["trunk"] = "Rotated"
                analysis_results["compensatory_strategies"].append("Trunk rotation")

        # Head & Neck (vertical stability)
        if nose_positions:
            neck_stiffness = np.std(nose_positions)
            if neck_stiffness < 2: 
                analysis_results["observations"]["head_neck"] = "Fixed / stiff"
                analysis_results["compensatory_strategies"].append("Neck stiffness for stability")

        # Arms
        if arm_heights:
            avg_arm_h = np.mean(arm_heights)
            if avg_arm_h < -10:
                analysis_results["observations"]["arms_hands"] = "Hands guarded"
                analysis_results["compensatory_strategies"].append("Hands used for security")

        # Reflex Influence Detection
        signs = []
        if analysis_results["observations"]["trunk"] == "Rotated": 
            signs.append("ATNR influence")
        if "Hands guarded" in analysis_results["observations"]["arms_hands"]: 
            signs.append("Palmar influence")
        if "Neck stiffness" in str(analysis_results["compensatory_strategies"]): 
            signs.append("Fear Paralysis influence")
        
        if len(signs) >= 2:
            analysis_results["reflex_influence"] = "Movement patterns suggest immature postural and protective responses consistent with incomplete primitive reflex integration."
            analysis_results["observations"]["symmetry"] = "Mild asymmetry"
        else:
            analysis_results["reflex_influence"] = ""

        # Numerical Safety Checks
        num_steps = stance_time_l // 10
        is_safe = self.independent_walking and num_steps >= 5 and len(frames_data) > 90
        
        if not is_safe:
            analysis_results["safety_note"] = "Numerical interpretation limited due to context."
        
        # Stability: measures trunk steadiness only. Weight imbalance has its own score.
        # Cap the trunk sway penalty at 50 points so the score stays meaningful.
        trunk_sway_penalty = min(50, np.std(trunk_rotations) * 2 if trunk_rotations else 0)
        stability_score = max(0, 100 - trunk_sway_penalty)

        # Symmetry: based on gait phase asymmetry. Each 1% asymmetry = 1 point deducted.
        sym_score = max(0, 100 - phase_asymmetry)

        # Strength: each weakness indicator costs 25 points
        strength_score = max(0, 100 - (weakness_indicators * 25))

        # Weight shift: overall_imbalance is already 0-100% so maps directly
        weight_shift_score = max(0, 100 - overall_imbalance) if weight_distributions else 100
        
        analysis_results["summary"] = {
            "stability_score": stability_score,
            "symmetry_score": sym_score,
            "weight_shift_score": weight_shift_score,
            "strength_score": strength_score,
            "phase_asymmetry_percent": phase_asymmetry,
            "overall_gait_quality": "concerning" if weakness_indicators >= 2 else "acceptable",
            "walking_condition": "Independent" if self.independent_walking else "Assisted"
        }
        
        return analysis_results


if __name__ == "__main__":
    analyzer = EnhancedGaitAnalyzer(age=10)
    print("Enhanced Gait Analyzer initialized with all features.")