import cv2 import numpy as np import os from ultralytics import YOLO from collections import deque class EnhancedGaitAnalyzer: def __init__(self, model_name="yolo11n-pose.pt", age=4, independent_walking=True): """ Enhanced Gait Analyzer with weight shifting, postural strength, AND original gait analysis. """ self.model = YOLO(model_name) self.age = age self.independent_walking = independent_walking # COCO 17-Keypoint Mapping self.KEYPOINT_MAP = { 0: "Nose", 1: "LEye", 2: "REye", 3: "LEar", 4: "REar", 5: "LShoulder", 6: "RShoulder", 7: "LElbow", 8: "RElbow", 9: "LWrist", 10: "RWrist", 11: "LHip", 12: "RHip", 13: "LKnee", 14: "RKnee", 15: "LAnkle", 16: "RAnkle" } # Thresholds self.WEIGHT_SHIFT_THRESHOLD = 0.15 self.HIP_DROP_THRESHOLD = 10 self.POSTURE_SWAY_THRESHOLD = 20 def process_video(self, video_path): """Process video and extract gait analysis metrics.""" if not os.path.exists(video_path): raise FileNotFoundError(f"Video file not found: {video_path}") results = self.model(video_path, stream=True, verbose=False) frames_data = [] for i, r in enumerate(results): if r.keypoints is None or r.keypoints.data.shape[0] == 0: frames_data.append({"frame": i, "keypoints": []}) continue kpts = r.keypoints.data[0].cpu().numpy() points = [] for kp in kpts: x, y, conf = kp if conf > 0.3: points.append((float(x), float(y))) else: points.append(None) frames_data.append({"frame": int(i), "keypoints": points}) return self._make_serializable(self.analyze_gait(frames_data)) def _make_serializable(self, obj): """Recursively convert numpy data types to native python types for JSON serialization.""" if isinstance(obj, dict): return {k: self._make_serializable(v) for k, v in obj.items()} elif isinstance(obj, list): return [self._make_serializable(v) for v in obj] elif isinstance(obj, tuple): return tuple(self._make_serializable(v) for v in obj) elif isinstance(obj, (np.integer, int)): return int(obj) elif isinstance(obj, (np.floating, float)): return float(obj) elif isinstance(obj, np.ndarray): return self._make_serializable(obj.tolist()) else: return obj def calculate_angle(self, p1, p2, p3): """Calculate angle between three points.""" if p1 is None or p2 is None or p3 is None: return None a = np.array(p1) b = np.array(p2) c = np.array(p3) ba = a - b bc = c - b norm_ba = np.linalg.norm(ba) norm_bc = np.linalg.norm(bc) if norm_ba == 0 or norm_bc == 0: return None cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc) angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0)) return np.degrees(angle) def calculate_weight_distribution(self, left_ankle, right_ankle, left_hip, right_hip): """Calculate weight distribution based on hip and ankle positions.""" if not all([left_ankle, right_ankle, left_hip, right_hip]): return None, None, None left_hip_y = left_hip[1] right_hip_y = right_hip[1] # In image coordinates, Y increases downward. # A higher hip (lower Y value) = stance/weight-bearing side. # So if left_hip_y < right_hip_y, the LEFT hip is higher → weight on LEFT. weight_on_left = (left_hip_y - right_hip_y) total_weight = abs(weight_on_left) + 1 weight_ratio = (weight_on_left + total_weight) / (2 * total_weight) weight_ratio = np.clip(weight_ratio, 0, 1) imbalance_percent = abs(weight_ratio - 0.5) * 200 if imbalance_percent < 10: primary_side = "balanced" elif weight_ratio > 0.5: primary_side = "left" else: primary_side = "right" return weight_ratio, primary_side, imbalance_percent def detect_hip_drop(self, left_hip, right_hip): """Detect Trendelenburg gait (hip drop on swing side).""" if not left_hip or not right_hip: return None, None hip_height_diff = abs(left_hip[1] - right_hip[1]) if hip_height_diff > self.HIP_DROP_THRESHOLD: if left_hip[1] > right_hip[1]: return "left_hip_drop", "right_side_weakness" else: return "right_hip_drop", "left_side_weakness" return None, None def assess_postural_strength(self, frames_data): """Assess postural strength.""" trunk_sway = [] shoulder_heights = [] head_forward_lean = [] knee_stability = [] for frame_data in frames_data: kps = frame_data.get("keypoints", []) if not kps or len(kps) < 17: continue if kps[5] and kps[6]: shoulder_center = (kps[5][0] + kps[6][0]) / 2 trunk_sway.append(shoulder_center) shoulder_diff = abs(kps[5][1] - kps[6][1]) shoulder_heights.append(shoulder_diff) if kps[0] and kps[5] and kps[6]: nose_to_shoulder = kps[0][0] - (kps[5][0] + kps[6][0]) / 2 head_forward_lean.append(nose_to_shoulder) if "angles" in frame_data: lk = frame_data["angles"].get("left_knee") rk = frame_data["angles"].get("right_knee") if lk and rk: if lk < 120 and rk < 120: knee_stability.append("weak") elif lk > 160 and rk > 160: knee_stability.append("stiff") else: knee_stability.append("normal") return { "trunk_sway": np.std(trunk_sway) if trunk_sway else 0, "shoulder_hiking": np.mean(shoulder_heights) if shoulder_heights else 0, "forward_lean": np.mean(np.abs(head_forward_lean)) if head_forward_lean else 0, "knee_stability": knee_stability } def analyze_gait(self, frames_data): """Complete gait analysis with all features.""" # Validation for visibility and person detection total_frames = len(frames_data) # Check minimum video length — need at least ~5 seconds (150 frames at 30fps) # for enough gait cycles to compute meaningful metrics. if total_frames < 150: raise ValueError("VIDEO_TOO_SHORT: Video is too short for meaningful analysis. Please provide a longer video.") valid_frames = 0 for frame_data in frames_data: kps = frame_data.get("keypoints", []) # A valid frame has enough confident keypoints (at least 8 keypoints) if kps and len([kp for kp in kps if kp is not None]) >= 8: valid_frames += 1 if valid_frames == 0: raise ValueError("NO_PERSON_DETECTED: No person could be detected in the video.") visibility_ratio = valid_frames / total_frames if visibility_ratio < 0.3: raise ValueError("POOR_VISIBILITY: Unable to detect clear movement in most frames. This may be due to bad video quality, loose clothing, bad recording angle, or the subject not being clearly visible.") analysis_results = { "frames": [], "summary": {}, "weight_distribution": { "overall_imbalance_percent": 0, "primary_weight_side": "unknown", "weight_shift_issues": [], "hip_drop_analysis": [] }, "postural_strength": { "trunk_sway_severity": "normal", "shoulder_stability": "normal", "forward_lean": "normal", "knee_stability": "normal", "overall_strength_assessment": "adequate" }, "gait_phases": { "stance_time_l": 0, "stance_time_r": 0, "swing_time_l": 0, "swing_time_r": 0, "double_support_frames": 0, "phase_asymmetry": 0 }, "risk_indicators": [], "flags": {}, "observations": { "trunk": "Neutral", "head_neck": "Neutral & mobile", "arms_hands": "Free arm swing", "lower_limbs": "Not observed", "symmetry": "Symmetrical", "weight_distribution": "Equal", "postural_control": "Good" }, "compensatory_strategies": [], "reflex_influence": "None observed", "safety_note": "", "clinical_notes": [] } left_knee_angles = [] right_knee_angles = [] left_hip_angles = [] right_hip_angles = [] left_ankle_heights = [] right_ankle_heights = [] weight_distributions = [] hip_drops = [] trunk_rotations = [] arm_heights = [] nose_positions = [] # First Pass: Calculate angles, weights, and collect gait phase data for frame_data in frames_data: kps = frame_data["keypoints"] if not kps or len(kps) < 17: analysis_results["frames"].append(frame_data) continue # Angles l_knee = self.calculate_angle(kps[11], kps[13], kps[15]) r_knee = self.calculate_angle(kps[12], kps[14], kps[16]) l_hip = self.calculate_angle(kps[5], kps[11], kps[13]) r_hip = self.calculate_angle(kps[6], kps[12], kps[14]) frame_data["angles"] = { "left_knee": l_knee, "right_knee": r_knee, "left_hip": l_hip, "right_hip": r_hip } if l_knee: left_knee_angles.append(l_knee) if r_knee: right_knee_angles.append(r_knee) if l_hip: left_hip_angles.append(l_hip) if r_hip: right_hip_angles.append(r_hip) # Weight Distribution Analysis weight_ratio, primary_side, imbalance = self.calculate_weight_distribution( kps[15], kps[16], kps[11], kps[12] ) if weight_ratio is not None: weight_distributions.append({ "ratio": weight_ratio, "primary_side": primary_side, "imbalance_percent": imbalance }) frame_data["weight_distribution"] = { "ratio": weight_ratio, "primary_side": primary_side, "imbalance_percent": imbalance } # Hip Drop Detection hip_drop, weakness = self.detect_hip_drop(kps[11], kps[12]) if hip_drop: hip_drops.append({"type": hip_drop, "weakness": weakness}) # Trunk rotation if kps[5] and kps[6]: trunk_rotations.append(abs(kps[5][0] - kps[6][0])) # Arm height for guarding if kps[5] and kps[9]: arm_heights.append(kps[5][1] - kps[9][1]) # Ankle heights for gait phase detection if kps[15]: left_ankle_heights.append(kps[15][1]) else: left_ankle_heights.append(None) if kps[16]: right_ankle_heights.append(kps[16][1]) else: right_ankle_heights.append(None) # Nose position for head stability if kps[0]: nose_positions.append(kps[0][1]) analysis_results["frames"].append(frame_data) # Gait Phase Detection def detect_phases(heights): phases = [] if not heights: return phases valid_heights = [h for h in heights if h is not None] if not valid_heights: return [None] * len(heights) ground_threshold = np.percentile(valid_heights, 80) for h in heights: if h is None: phases.append(None) elif h >= ground_threshold: phases.append("stance") else: phases.append("swing") return phases l_phases = detect_phases(left_ankle_heights) r_phases = detect_phases(right_ankle_heights) stance_time_l = l_phases.count("stance") swing_time_l = l_phases.count("swing") stance_time_r = r_phases.count("stance") swing_time_r = r_phases.count("swing") double_support_frames = 0 for lp, rp in zip(l_phases, r_phases): if lp == "stance" and rp == "stance": double_support_frames += 1 phase_asymmetry = 0 if max(stance_time_l, stance_time_r) > 0: phase_asymmetry = (abs(stance_time_l - stance_time_r) / max(stance_time_l, stance_time_r)) * 100 analysis_results["gait_phases"] = { "stance_time_l": stance_time_l, "stance_time_r": stance_time_r, "swing_time_l": swing_time_l, "swing_time_r": swing_time_r, "double_support_frames": double_support_frames, "phase_asymmetry": phase_asymmetry } # Postural Strength Assessment strength_assessment = self.assess_postural_strength(frames_data) # Analyze Weight Distribution overall_imbalance = 0 primary_side = "balanced" if weight_distributions: imbalances = [w["imbalance_percent"] for w in weight_distributions] overall_imbalance = np.mean(imbalances) primary_sides = [w["primary_side"] for w in weight_distributions] primary_side = max(set([s for s in primary_sides if s != "balanced"]), key=primary_sides.count, default="balanced") analysis_results["weight_distribution"]["overall_imbalance_percent"] = overall_imbalance analysis_results["weight_distribution"]["primary_weight_side"] = primary_side if overall_imbalance > self.WEIGHT_SHIFT_THRESHOLD * 100: analysis_results["weight_distribution"]["weight_shift_issues"].append( f"Significant weight shift ({overall_imbalance:.1f}% imbalance) - " f"predominantly loading {primary_side} side" ) analysis_results["observations"]["weight_distribution"] = f"Asymmetrical - favoring {primary_side}" analysis_results["clinical_notes"].append( f"WEIGHT DISTRIBUTION: Child shows {overall_imbalance:.1f}% weight shift toward {primary_side} side. " f"Suggests pain avoidance, weakness, or motor planning issue on opposite side." ) if hip_drops: for drop in hip_drops: analysis_results["weight_distribution"]["hip_drop_analysis"].append(drop) analysis_results["clinical_notes"].append( f"HIP DROP: {drop['type']} indicates {drop['weakness']} - " f"weakness in hip abductors or gluteus medius" ) # Postural Strength Scoring trunk_sway = strength_assessment["trunk_sway"] if trunk_sway > self.POSTURE_SWAY_THRESHOLD: analysis_results["postural_strength"]["trunk_sway_severity"] = "excessive" analysis_results["observations"]["postural_control"] = "Weak - excessive sway" analysis_results["clinical_notes"].append( f"TRUNK SWAY: Significant lateral sway (std: {trunk_sway:.1f}px). " f"Suggests weak core stability." ) shoulder_hike = strength_assessment["shoulder_hiking"] if shoulder_hike > 5: analysis_results["postural_strength"]["shoulder_stability"] = "hiking" analysis_results["compensatory_strategies"].append("Shoulder hiking for stability") analysis_results["clinical_notes"].append( f"SHOULDER HIKING: Upper trapezius compensation for core/trunk weakness." ) forward_lean = strength_assessment["forward_lean"] if forward_lean > 15: analysis_results["postural_strength"]["forward_lean"] = "excessive" analysis_results["clinical_notes"].append( f"FORWARD HEAD POSTURE: Head forward {forward_lean:.1f}px. " f"Weak neck/shoulder stabilizers or balance compensation." ) knee_issues = strength_assessment["knee_stability"] if knee_issues.count("weak") > len(knee_issues) * 0.5: analysis_results["postural_strength"]["knee_stability"] = "weak" analysis_results["clinical_notes"].append( f"KNEE WEAKNESS: Insufficient extension/flexion control. " f"Quad weakness or motor planning issues." ) weakness_indicators = sum([ trunk_sway > self.POSTURE_SWAY_THRESHOLD, shoulder_hike > 5, forward_lean > 15, knee_issues.count("weak") > len(knee_issues) * 0.5 ]) if weakness_indicators >= 2: analysis_results["postural_strength"]["overall_strength_assessment"] = "inadequate" analysis_results["observations"]["postural_control"] = "Floppy/weak - multiple issues" analysis_results["clinical_notes"].append( f"POSTURAL ASSESSMENT: Low muscle tone/weakness ({weakness_indicators} issues detected). " f"Recommend strength assessment." ) # Original Observations Logic # Trunk if trunk_rotations: sway = np.std(trunk_rotations) if sway > 15: analysis_results["observations"]["trunk"] = "Stiff / reduced rotation" analysis_results["compensatory_strategies"].append("Trunk used for balance") elif sway > 10: analysis_results["observations"]["trunk"] = "Rotated" analysis_results["compensatory_strategies"].append("Trunk rotation") # Head & Neck (vertical stability) if nose_positions: neck_stiffness = np.std(nose_positions) if neck_stiffness < 2: analysis_results["observations"]["head_neck"] = "Fixed / stiff" analysis_results["compensatory_strategies"].append("Neck stiffness for stability") # Arms if arm_heights: avg_arm_h = np.mean(arm_heights) if avg_arm_h < -10: analysis_results["observations"]["arms_hands"] = "Hands guarded" analysis_results["compensatory_strategies"].append("Hands used for security") # Reflex Influence Detection signs = [] if analysis_results["observations"]["trunk"] == "Rotated": signs.append("ATNR influence") if "Hands guarded" in analysis_results["observations"]["arms_hands"]: signs.append("Palmar influence") if "Neck stiffness" in str(analysis_results["compensatory_strategies"]): signs.append("Fear Paralysis influence") if len(signs) >= 2: analysis_results["reflex_influence"] = "Movement patterns suggest immature postural and protective responses consistent with incomplete primitive reflex integration." analysis_results["observations"]["symmetry"] = "Mild asymmetry" else: analysis_results["reflex_influence"] = "" # Numerical Safety Checks num_steps = stance_time_l // 10 is_safe = self.independent_walking and num_steps >= 5 and len(frames_data) > 90 if not is_safe: analysis_results["safety_note"] = "Numerical interpretation limited due to context." # Stability: measures trunk steadiness only. Weight imbalance has its own score. # Cap the trunk sway penalty at 50 points so the score stays meaningful. trunk_sway_penalty = min(50, np.std(trunk_rotations) * 2 if trunk_rotations else 0) stability_score = max(0, 100 - trunk_sway_penalty) # Symmetry: based on gait phase asymmetry. Each 1% asymmetry = 1 point deducted. sym_score = max(0, 100 - phase_asymmetry) # Strength: each weakness indicator costs 25 points strength_score = max(0, 100 - (weakness_indicators * 25)) # Weight shift: overall_imbalance is already 0-100% so maps directly weight_shift_score = max(0, 100 - overall_imbalance) if weight_distributions else 100 analysis_results["summary"] = { "stability_score": stability_score, "symmetry_score": sym_score, "weight_shift_score": weight_shift_score, "strength_score": strength_score, "phase_asymmetry_percent": phase_asymmetry, "overall_gait_quality": "concerning" if weakness_indicators >= 2 else "acceptable", "walking_condition": "Independent" if self.independent_walking else "Assisted" } return analysis_results if __name__ == "__main__": analyzer = EnhancedGaitAnalyzer(age=10) print("Enhanced Gait Analyzer initialized with all features.")