Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import os | |
| from ultralytics import YOLO | |
| from collections import deque | |
| class EnhancedGaitAnalyzer: | |
| def __init__(self, model_name="yolo11n-pose.pt", age=4, independent_walking=True): | |
| """ | |
| Enhanced Gait Analyzer with weight shifting, postural strength, AND original gait analysis. | |
| """ | |
| self.model = YOLO(model_name) | |
| self.age = age | |
| self.independent_walking = independent_walking | |
| # COCO 17-Keypoint Mapping | |
| self.KEYPOINT_MAP = { | |
| 0: "Nose", 1: "LEye", 2: "REye", 3: "LEar", 4: "REar", | |
| 5: "LShoulder", 6: "RShoulder", 7: "LElbow", 8: "RElbow", | |
| 9: "LWrist", 10: "RWrist", 11: "LHip", 12: "RHip", | |
| 13: "LKnee", 14: "RKnee", 15: "LAnkle", 16: "RAnkle" | |
| } | |
| # Thresholds | |
| self.WEIGHT_SHIFT_THRESHOLD = 0.15 | |
| self.HIP_DROP_THRESHOLD = 10 | |
| self.POSTURE_SWAY_THRESHOLD = 20 | |
| def process_video(self, video_path): | |
| """Process video and extract gait analysis metrics.""" | |
| if not os.path.exists(video_path): | |
| raise FileNotFoundError(f"Video file not found: {video_path}") | |
| results = self.model(video_path, stream=True, verbose=False) | |
| frames_data = [] | |
| for i, r in enumerate(results): | |
| if r.keypoints is None or r.keypoints.data.shape[0] == 0: | |
| frames_data.append({"frame": i, "keypoints": []}) | |
| continue | |
| kpts = r.keypoints.data[0].cpu().numpy() | |
| points = [] | |
| for kp in kpts: | |
| x, y, conf = kp | |
| if conf > 0.3: | |
| points.append((float(x), float(y))) | |
| else: | |
| points.append(None) | |
| frames_data.append({"frame": int(i), "keypoints": points}) | |
| return self._make_serializable(self.analyze_gait(frames_data)) | |
| def _make_serializable(self, obj): | |
| """Recursively convert numpy data types to native python types for JSON serialization.""" | |
| if isinstance(obj, dict): | |
| return {k: self._make_serializable(v) for k, v in obj.items()} | |
| elif isinstance(obj, list): | |
| return [self._make_serializable(v) for v in obj] | |
| elif isinstance(obj, tuple): | |
| return tuple(self._make_serializable(v) for v in obj) | |
| elif isinstance(obj, (np.integer, int)): | |
| return int(obj) | |
| elif isinstance(obj, (np.floating, float)): | |
| return float(obj) | |
| elif isinstance(obj, np.ndarray): | |
| return self._make_serializable(obj.tolist()) | |
| else: | |
| return obj | |
| def calculate_angle(self, p1, p2, p3): | |
| """Calculate angle between three points.""" | |
| if p1 is None or p2 is None or p3 is None: | |
| return None | |
| a = np.array(p1) | |
| b = np.array(p2) | |
| c = np.array(p3) | |
| ba = a - b | |
| bc = c - b | |
| norm_ba = np.linalg.norm(ba) | |
| norm_bc = np.linalg.norm(bc) | |
| if norm_ba == 0 or norm_bc == 0: | |
| return None | |
| cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc) | |
| angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0)) | |
| return np.degrees(angle) | |
| def calculate_weight_distribution(self, left_ankle, right_ankle, left_hip, right_hip): | |
| """Calculate weight distribution based on hip and ankle positions.""" | |
| if not all([left_ankle, right_ankle, left_hip, right_hip]): | |
| return None, None, None | |
| left_hip_y = left_hip[1] | |
| right_hip_y = right_hip[1] | |
| # In image coordinates, Y increases downward. | |
| # A higher hip (lower Y value) = stance/weight-bearing side. | |
| # So if left_hip_y < right_hip_y, the LEFT hip is higher → weight on LEFT. | |
| weight_on_left = (left_hip_y - right_hip_y) | |
| total_weight = abs(weight_on_left) + 1 | |
| weight_ratio = (weight_on_left + total_weight) / (2 * total_weight) | |
| weight_ratio = np.clip(weight_ratio, 0, 1) | |
| imbalance_percent = abs(weight_ratio - 0.5) * 200 | |
| if imbalance_percent < 10: | |
| primary_side = "balanced" | |
| elif weight_ratio > 0.5: | |
| primary_side = "left" | |
| else: | |
| primary_side = "right" | |
| return weight_ratio, primary_side, imbalance_percent | |
| def detect_hip_drop(self, left_hip, right_hip): | |
| """Detect Trendelenburg gait (hip drop on swing side).""" | |
| if not left_hip or not right_hip: | |
| return None, None | |
| hip_height_diff = abs(left_hip[1] - right_hip[1]) | |
| if hip_height_diff > self.HIP_DROP_THRESHOLD: | |
| if left_hip[1] > right_hip[1]: | |
| return "left_hip_drop", "right_side_weakness" | |
| else: | |
| return "right_hip_drop", "left_side_weakness" | |
| return None, None | |
| def assess_postural_strength(self, frames_data): | |
| """Assess postural strength.""" | |
| trunk_sway = [] | |
| shoulder_heights = [] | |
| head_forward_lean = [] | |
| knee_stability = [] | |
| for frame_data in frames_data: | |
| kps = frame_data.get("keypoints", []) | |
| if not kps or len(kps) < 17: | |
| continue | |
| if kps[5] and kps[6]: | |
| shoulder_center = (kps[5][0] + kps[6][0]) / 2 | |
| trunk_sway.append(shoulder_center) | |
| shoulder_diff = abs(kps[5][1] - kps[6][1]) | |
| shoulder_heights.append(shoulder_diff) | |
| if kps[0] and kps[5] and kps[6]: | |
| nose_to_shoulder = kps[0][0] - (kps[5][0] + kps[6][0]) / 2 | |
| head_forward_lean.append(nose_to_shoulder) | |
| if "angles" in frame_data: | |
| lk = frame_data["angles"].get("left_knee") | |
| rk = frame_data["angles"].get("right_knee") | |
| if lk and rk: | |
| if lk < 120 and rk < 120: | |
| knee_stability.append("weak") | |
| elif lk > 160 and rk > 160: | |
| knee_stability.append("stiff") | |
| else: | |
| knee_stability.append("normal") | |
| return { | |
| "trunk_sway": np.std(trunk_sway) if trunk_sway else 0, | |
| "shoulder_hiking": np.mean(shoulder_heights) if shoulder_heights else 0, | |
| "forward_lean": np.mean(np.abs(head_forward_lean)) if head_forward_lean else 0, | |
| "knee_stability": knee_stability | |
| } | |
| def analyze_gait(self, frames_data): | |
| """Complete gait analysis with all features.""" | |
| # Validation for visibility and person detection | |
| total_frames = len(frames_data) | |
| # Check minimum video length — need at least ~5 seconds (150 frames at 30fps) | |
| # for enough gait cycles to compute meaningful metrics. | |
| if total_frames < 150: | |
| raise ValueError("VIDEO_TOO_SHORT: Video is too short for meaningful analysis. Please provide a longer video.") | |
| valid_frames = 0 | |
| for frame_data in frames_data: | |
| kps = frame_data.get("keypoints", []) | |
| # A valid frame has enough confident keypoints (at least 8 keypoints) | |
| if kps and len([kp for kp in kps if kp is not None]) >= 8: | |
| valid_frames += 1 | |
| if valid_frames == 0: | |
| raise ValueError("NO_PERSON_DETECTED: No person could be detected in the video.") | |
| visibility_ratio = valid_frames / total_frames | |
| if visibility_ratio < 0.3: | |
| raise ValueError("POOR_VISIBILITY: Unable to detect clear movement in most frames. This may be due to bad video quality, loose clothing, bad recording angle, or the subject not being clearly visible.") | |
| analysis_results = { | |
| "frames": [], | |
| "summary": {}, | |
| "weight_distribution": { | |
| "overall_imbalance_percent": 0, | |
| "primary_weight_side": "unknown", | |
| "weight_shift_issues": [], | |
| "hip_drop_analysis": [] | |
| }, | |
| "postural_strength": { | |
| "trunk_sway_severity": "normal", | |
| "shoulder_stability": "normal", | |
| "forward_lean": "normal", | |
| "knee_stability": "normal", | |
| "overall_strength_assessment": "adequate" | |
| }, | |
| "gait_phases": { | |
| "stance_time_l": 0, | |
| "stance_time_r": 0, | |
| "swing_time_l": 0, | |
| "swing_time_r": 0, | |
| "double_support_frames": 0, | |
| "phase_asymmetry": 0 | |
| }, | |
| "risk_indicators": [], | |
| "flags": {}, | |
| "observations": { | |
| "trunk": "Neutral", | |
| "head_neck": "Neutral & mobile", | |
| "arms_hands": "Free arm swing", | |
| "lower_limbs": "Not observed", | |
| "symmetry": "Symmetrical", | |
| "weight_distribution": "Equal", | |
| "postural_control": "Good" | |
| }, | |
| "compensatory_strategies": [], | |
| "reflex_influence": "None observed", | |
| "safety_note": "", | |
| "clinical_notes": [] | |
| } | |
| left_knee_angles = [] | |
| right_knee_angles = [] | |
| left_hip_angles = [] | |
| right_hip_angles = [] | |
| left_ankle_heights = [] | |
| right_ankle_heights = [] | |
| weight_distributions = [] | |
| hip_drops = [] | |
| trunk_rotations = [] | |
| arm_heights = [] | |
| nose_positions = [] | |
| # First Pass: Calculate angles, weights, and collect gait phase data | |
| for frame_data in frames_data: | |
| kps = frame_data["keypoints"] | |
| if not kps or len(kps) < 17: | |
| analysis_results["frames"].append(frame_data) | |
| continue | |
| # Angles | |
| l_knee = self.calculate_angle(kps[11], kps[13], kps[15]) | |
| r_knee = self.calculate_angle(kps[12], kps[14], kps[16]) | |
| l_hip = self.calculate_angle(kps[5], kps[11], kps[13]) | |
| r_hip = self.calculate_angle(kps[6], kps[12], kps[14]) | |
| frame_data["angles"] = { | |
| "left_knee": l_knee, | |
| "right_knee": r_knee, | |
| "left_hip": l_hip, | |
| "right_hip": r_hip | |
| } | |
| if l_knee: left_knee_angles.append(l_knee) | |
| if r_knee: right_knee_angles.append(r_knee) | |
| if l_hip: left_hip_angles.append(l_hip) | |
| if r_hip: right_hip_angles.append(r_hip) | |
| # Weight Distribution Analysis | |
| weight_ratio, primary_side, imbalance = self.calculate_weight_distribution( | |
| kps[15], kps[16], kps[11], kps[12] | |
| ) | |
| if weight_ratio is not None: | |
| weight_distributions.append({ | |
| "ratio": weight_ratio, | |
| "primary_side": primary_side, | |
| "imbalance_percent": imbalance | |
| }) | |
| frame_data["weight_distribution"] = { | |
| "ratio": weight_ratio, | |
| "primary_side": primary_side, | |
| "imbalance_percent": imbalance | |
| } | |
| # Hip Drop Detection | |
| hip_drop, weakness = self.detect_hip_drop(kps[11], kps[12]) | |
| if hip_drop: | |
| hip_drops.append({"type": hip_drop, "weakness": weakness}) | |
| # Trunk rotation | |
| if kps[5] and kps[6]: | |
| trunk_rotations.append(abs(kps[5][0] - kps[6][0])) | |
| # Arm height for guarding | |
| if kps[5] and kps[9]: | |
| arm_heights.append(kps[5][1] - kps[9][1]) | |
| # Ankle heights for gait phase detection | |
| if kps[15]: | |
| left_ankle_heights.append(kps[15][1]) | |
| else: | |
| left_ankle_heights.append(None) | |
| if kps[16]: | |
| right_ankle_heights.append(kps[16][1]) | |
| else: | |
| right_ankle_heights.append(None) | |
| # Nose position for head stability | |
| if kps[0]: | |
| nose_positions.append(kps[0][1]) | |
| analysis_results["frames"].append(frame_data) | |
| # Gait Phase Detection | |
| def detect_phases(heights): | |
| phases = [] | |
| if not heights: return phases | |
| valid_heights = [h for h in heights if h is not None] | |
| if not valid_heights: return [None] * len(heights) | |
| ground_threshold = np.percentile(valid_heights, 80) | |
| for h in heights: | |
| if h is None: phases.append(None) | |
| elif h >= ground_threshold: phases.append("stance") | |
| else: phases.append("swing") | |
| return phases | |
| l_phases = detect_phases(left_ankle_heights) | |
| r_phases = detect_phases(right_ankle_heights) | |
| stance_time_l = l_phases.count("stance") | |
| swing_time_l = l_phases.count("swing") | |
| stance_time_r = r_phases.count("stance") | |
| swing_time_r = r_phases.count("swing") | |
| double_support_frames = 0 | |
| for lp, rp in zip(l_phases, r_phases): | |
| if lp == "stance" and rp == "stance": | |
| double_support_frames += 1 | |
| phase_asymmetry = 0 | |
| if max(stance_time_l, stance_time_r) > 0: | |
| phase_asymmetry = (abs(stance_time_l - stance_time_r) / max(stance_time_l, stance_time_r)) * 100 | |
| analysis_results["gait_phases"] = { | |
| "stance_time_l": stance_time_l, | |
| "stance_time_r": stance_time_r, | |
| "swing_time_l": swing_time_l, | |
| "swing_time_r": swing_time_r, | |
| "double_support_frames": double_support_frames, | |
| "phase_asymmetry": phase_asymmetry | |
| } | |
| # Postural Strength Assessment | |
| strength_assessment = self.assess_postural_strength(frames_data) | |
| # Analyze Weight Distribution | |
| overall_imbalance = 0 | |
| primary_side = "balanced" | |
| if weight_distributions: | |
| imbalances = [w["imbalance_percent"] for w in weight_distributions] | |
| overall_imbalance = np.mean(imbalances) | |
| primary_sides = [w["primary_side"] for w in weight_distributions] | |
| primary_side = max(set([s for s in primary_sides if s != "balanced"]), | |
| key=primary_sides.count, default="balanced") | |
| analysis_results["weight_distribution"]["overall_imbalance_percent"] = overall_imbalance | |
| analysis_results["weight_distribution"]["primary_weight_side"] = primary_side | |
| if overall_imbalance > self.WEIGHT_SHIFT_THRESHOLD * 100: | |
| analysis_results["weight_distribution"]["weight_shift_issues"].append( | |
| f"Significant weight shift ({overall_imbalance:.1f}% imbalance) - " | |
| f"predominantly loading {primary_side} side" | |
| ) | |
| analysis_results["observations"]["weight_distribution"] = f"Asymmetrical - favoring {primary_side}" | |
| analysis_results["clinical_notes"].append( | |
| f"WEIGHT DISTRIBUTION: Child shows {overall_imbalance:.1f}% weight shift toward {primary_side} side. " | |
| f"Suggests pain avoidance, weakness, or motor planning issue on opposite side." | |
| ) | |
| if hip_drops: | |
| for drop in hip_drops: | |
| analysis_results["weight_distribution"]["hip_drop_analysis"].append(drop) | |
| analysis_results["clinical_notes"].append( | |
| f"HIP DROP: {drop['type']} indicates {drop['weakness']} - " | |
| f"weakness in hip abductors or gluteus medius" | |
| ) | |
| # Postural Strength Scoring | |
| trunk_sway = strength_assessment["trunk_sway"] | |
| if trunk_sway > self.POSTURE_SWAY_THRESHOLD: | |
| analysis_results["postural_strength"]["trunk_sway_severity"] = "excessive" | |
| analysis_results["observations"]["postural_control"] = "Weak - excessive sway" | |
| analysis_results["clinical_notes"].append( | |
| f"TRUNK SWAY: Significant lateral sway (std: {trunk_sway:.1f}px). " | |
| f"Suggests weak core stability." | |
| ) | |
| shoulder_hike = strength_assessment["shoulder_hiking"] | |
| if shoulder_hike > 5: | |
| analysis_results["postural_strength"]["shoulder_stability"] = "hiking" | |
| analysis_results["compensatory_strategies"].append("Shoulder hiking for stability") | |
| analysis_results["clinical_notes"].append( | |
| f"SHOULDER HIKING: Upper trapezius compensation for core/trunk weakness." | |
| ) | |
| forward_lean = strength_assessment["forward_lean"] | |
| if forward_lean > 15: | |
| analysis_results["postural_strength"]["forward_lean"] = "excessive" | |
| analysis_results["clinical_notes"].append( | |
| f"FORWARD HEAD POSTURE: Head forward {forward_lean:.1f}px. " | |
| f"Weak neck/shoulder stabilizers or balance compensation." | |
| ) | |
| knee_issues = strength_assessment["knee_stability"] | |
| if knee_issues.count("weak") > len(knee_issues) * 0.5: | |
| analysis_results["postural_strength"]["knee_stability"] = "weak" | |
| analysis_results["clinical_notes"].append( | |
| f"KNEE WEAKNESS: Insufficient extension/flexion control. " | |
| f"Quad weakness or motor planning issues." | |
| ) | |
| weakness_indicators = sum([ | |
| trunk_sway > self.POSTURE_SWAY_THRESHOLD, | |
| shoulder_hike > 5, | |
| forward_lean > 15, | |
| knee_issues.count("weak") > len(knee_issues) * 0.5 | |
| ]) | |
| if weakness_indicators >= 2: | |
| analysis_results["postural_strength"]["overall_strength_assessment"] = "inadequate" | |
| analysis_results["observations"]["postural_control"] = "Floppy/weak - multiple issues" | |
| analysis_results["clinical_notes"].append( | |
| f"POSTURAL ASSESSMENT: Low muscle tone/weakness ({weakness_indicators} issues detected). " | |
| f"Recommend strength assessment." | |
| ) | |
| # Original Observations Logic | |
| # Trunk | |
| if trunk_rotations: | |
| sway = np.std(trunk_rotations) | |
| if sway > 15: | |
| analysis_results["observations"]["trunk"] = "Stiff / reduced rotation" | |
| analysis_results["compensatory_strategies"].append("Trunk used for balance") | |
| elif sway > 10: | |
| analysis_results["observations"]["trunk"] = "Rotated" | |
| analysis_results["compensatory_strategies"].append("Trunk rotation") | |
| # Head & Neck (vertical stability) | |
| if nose_positions: | |
| neck_stiffness = np.std(nose_positions) | |
| if neck_stiffness < 2: | |
| analysis_results["observations"]["head_neck"] = "Fixed / stiff" | |
| analysis_results["compensatory_strategies"].append("Neck stiffness for stability") | |
| # Arms | |
| if arm_heights: | |
| avg_arm_h = np.mean(arm_heights) | |
| if avg_arm_h < -10: | |
| analysis_results["observations"]["arms_hands"] = "Hands guarded" | |
| analysis_results["compensatory_strategies"].append("Hands used for security") | |
| # Reflex Influence Detection | |
| signs = [] | |
| if analysis_results["observations"]["trunk"] == "Rotated": | |
| signs.append("ATNR influence") | |
| if "Hands guarded" in analysis_results["observations"]["arms_hands"]: | |
| signs.append("Palmar influence") | |
| if "Neck stiffness" in str(analysis_results["compensatory_strategies"]): | |
| signs.append("Fear Paralysis influence") | |
| if len(signs) >= 2: | |
| analysis_results["reflex_influence"] = "Movement patterns suggest immature postural and protective responses consistent with incomplete primitive reflex integration." | |
| analysis_results["observations"]["symmetry"] = "Mild asymmetry" | |
| else: | |
| analysis_results["reflex_influence"] = "" | |
| # Numerical Safety Checks | |
| num_steps = stance_time_l // 10 | |
| is_safe = self.independent_walking and num_steps >= 5 and len(frames_data) > 90 | |
| if not is_safe: | |
| analysis_results["safety_note"] = "Numerical interpretation limited due to context." | |
| # Stability: measures trunk steadiness only. Weight imbalance has its own score. | |
| # Cap the trunk sway penalty at 50 points so the score stays meaningful. | |
| trunk_sway_penalty = min(50, np.std(trunk_rotations) * 2 if trunk_rotations else 0) | |
| stability_score = max(0, 100 - trunk_sway_penalty) | |
| # Symmetry: based on gait phase asymmetry. Each 1% asymmetry = 1 point deducted. | |
| sym_score = max(0, 100 - phase_asymmetry) | |
| # Strength: each weakness indicator costs 25 points | |
| strength_score = max(0, 100 - (weakness_indicators * 25)) | |
| # Weight shift: overall_imbalance is already 0-100% so maps directly | |
| weight_shift_score = max(0, 100 - overall_imbalance) if weight_distributions else 100 | |
| analysis_results["summary"] = { | |
| "stability_score": stability_score, | |
| "symmetry_score": sym_score, | |
| "weight_shift_score": weight_shift_score, | |
| "strength_score": strength_score, | |
| "phase_asymmetry_percent": phase_asymmetry, | |
| "overall_gait_quality": "concerning" if weakness_indicators >= 2 else "acceptable", | |
| "walking_condition": "Independent" if self.independent_walking else "Assisted" | |
| } | |
| return analysis_results | |
| if __name__ == "__main__": | |
| analyzer = EnhancedGaitAnalyzer(age=10) | |
| print("Enhanced Gait Analyzer initialized with all features.") |