gait-analysis-backend / gait_analysis.py
techSnipe's picture
Upload folder using huggingface_hub
83ee618 verified
import cv2
import numpy as np
import os
from ultralytics import YOLO
from collections import deque
class EnhancedGaitAnalyzer:
def __init__(self, model_name="yolo11n-pose.pt", age=4, independent_walking=True):
"""
Enhanced Gait Analyzer with weight shifting, postural strength, AND original gait analysis.
"""
self.model = YOLO(model_name)
self.age = age
self.independent_walking = independent_walking
# COCO 17-Keypoint Mapping
self.KEYPOINT_MAP = {
0: "Nose", 1: "LEye", 2: "REye", 3: "LEar", 4: "REar",
5: "LShoulder", 6: "RShoulder", 7: "LElbow", 8: "RElbow",
9: "LWrist", 10: "RWrist", 11: "LHip", 12: "RHip",
13: "LKnee", 14: "RKnee", 15: "LAnkle", 16: "RAnkle"
}
# Thresholds
self.WEIGHT_SHIFT_THRESHOLD = 0.15
self.HIP_DROP_THRESHOLD = 10
self.POSTURE_SWAY_THRESHOLD = 20
def process_video(self, video_path):
"""Process video and extract gait analysis metrics."""
if not os.path.exists(video_path):
raise FileNotFoundError(f"Video file not found: {video_path}")
results = self.model(video_path, stream=True, verbose=False)
frames_data = []
for i, r in enumerate(results):
if r.keypoints is None or r.keypoints.data.shape[0] == 0:
frames_data.append({"frame": i, "keypoints": []})
continue
kpts = r.keypoints.data[0].cpu().numpy()
points = []
for kp in kpts:
x, y, conf = kp
if conf > 0.3:
points.append((float(x), float(y)))
else:
points.append(None)
frames_data.append({"frame": int(i), "keypoints": points})
return self._make_serializable(self.analyze_gait(frames_data))
def _make_serializable(self, obj):
"""Recursively convert numpy data types to native python types for JSON serialization."""
if isinstance(obj, dict):
return {k: self._make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self._make_serializable(v) for v in obj]
elif isinstance(obj, tuple):
return tuple(self._make_serializable(v) for v in obj)
elif isinstance(obj, (np.integer, int)):
return int(obj)
elif isinstance(obj, (np.floating, float)):
return float(obj)
elif isinstance(obj, np.ndarray):
return self._make_serializable(obj.tolist())
else:
return obj
def calculate_angle(self, p1, p2, p3):
"""Calculate angle between three points."""
if p1 is None or p2 is None or p3 is None:
return None
a = np.array(p1)
b = np.array(p2)
c = np.array(p3)
ba = a - b
bc = c - b
norm_ba = np.linalg.norm(ba)
norm_bc = np.linalg.norm(bc)
if norm_ba == 0 or norm_bc == 0:
return None
cosine_angle = np.dot(ba, bc) / (norm_ba * norm_bc)
angle = np.arccos(np.clip(cosine_angle, -1.0, 1.0))
return np.degrees(angle)
def calculate_weight_distribution(self, left_ankle, right_ankle, left_hip, right_hip):
"""Calculate weight distribution based on hip and ankle positions."""
if not all([left_ankle, right_ankle, left_hip, right_hip]):
return None, None, None
left_hip_y = left_hip[1]
right_hip_y = right_hip[1]
# In image coordinates, Y increases downward.
# A higher hip (lower Y value) = stance/weight-bearing side.
# So if left_hip_y < right_hip_y, the LEFT hip is higher → weight on LEFT.
weight_on_left = (left_hip_y - right_hip_y)
total_weight = abs(weight_on_left) + 1
weight_ratio = (weight_on_left + total_weight) / (2 * total_weight)
weight_ratio = np.clip(weight_ratio, 0, 1)
imbalance_percent = abs(weight_ratio - 0.5) * 200
if imbalance_percent < 10:
primary_side = "balanced"
elif weight_ratio > 0.5:
primary_side = "left"
else:
primary_side = "right"
return weight_ratio, primary_side, imbalance_percent
def detect_hip_drop(self, left_hip, right_hip):
"""Detect Trendelenburg gait (hip drop on swing side)."""
if not left_hip or not right_hip:
return None, None
hip_height_diff = abs(left_hip[1] - right_hip[1])
if hip_height_diff > self.HIP_DROP_THRESHOLD:
if left_hip[1] > right_hip[1]:
return "left_hip_drop", "right_side_weakness"
else:
return "right_hip_drop", "left_side_weakness"
return None, None
def assess_postural_strength(self, frames_data):
"""Assess postural strength."""
trunk_sway = []
shoulder_heights = []
head_forward_lean = []
knee_stability = []
for frame_data in frames_data:
kps = frame_data.get("keypoints", [])
if not kps or len(kps) < 17:
continue
if kps[5] and kps[6]:
shoulder_center = (kps[5][0] + kps[6][0]) / 2
trunk_sway.append(shoulder_center)
shoulder_diff = abs(kps[5][1] - kps[6][1])
shoulder_heights.append(shoulder_diff)
if kps[0] and kps[5] and kps[6]:
nose_to_shoulder = kps[0][0] - (kps[5][0] + kps[6][0]) / 2
head_forward_lean.append(nose_to_shoulder)
if "angles" in frame_data:
lk = frame_data["angles"].get("left_knee")
rk = frame_data["angles"].get("right_knee")
if lk and rk:
if lk < 120 and rk < 120:
knee_stability.append("weak")
elif lk > 160 and rk > 160:
knee_stability.append("stiff")
else:
knee_stability.append("normal")
return {
"trunk_sway": np.std(trunk_sway) if trunk_sway else 0,
"shoulder_hiking": np.mean(shoulder_heights) if shoulder_heights else 0,
"forward_lean": np.mean(np.abs(head_forward_lean)) if head_forward_lean else 0,
"knee_stability": knee_stability
}
def analyze_gait(self, frames_data):
"""Complete gait analysis with all features."""
# Validation for visibility and person detection
total_frames = len(frames_data)
# Check minimum video length — need at least ~5 seconds (150 frames at 30fps)
# for enough gait cycles to compute meaningful metrics.
if total_frames < 150:
raise ValueError("VIDEO_TOO_SHORT: Video is too short for meaningful analysis. Please provide a longer video.")
valid_frames = 0
for frame_data in frames_data:
kps = frame_data.get("keypoints", [])
# A valid frame has enough confident keypoints (at least 8 keypoints)
if kps and len([kp for kp in kps if kp is not None]) >= 8:
valid_frames += 1
if valid_frames == 0:
raise ValueError("NO_PERSON_DETECTED: No person could be detected in the video.")
visibility_ratio = valid_frames / total_frames
if visibility_ratio < 0.3:
raise ValueError("POOR_VISIBILITY: Unable to detect clear movement in most frames. This may be due to bad video quality, loose clothing, bad recording angle, or the subject not being clearly visible.")
analysis_results = {
"frames": [],
"summary": {},
"weight_distribution": {
"overall_imbalance_percent": 0,
"primary_weight_side": "unknown",
"weight_shift_issues": [],
"hip_drop_analysis": []
},
"postural_strength": {
"trunk_sway_severity": "normal",
"shoulder_stability": "normal",
"forward_lean": "normal",
"knee_stability": "normal",
"overall_strength_assessment": "adequate"
},
"gait_phases": {
"stance_time_l": 0,
"stance_time_r": 0,
"swing_time_l": 0,
"swing_time_r": 0,
"double_support_frames": 0,
"phase_asymmetry": 0
},
"risk_indicators": [],
"flags": {},
"observations": {
"trunk": "Neutral",
"head_neck": "Neutral & mobile",
"arms_hands": "Free arm swing",
"lower_limbs": "Not observed",
"symmetry": "Symmetrical",
"weight_distribution": "Equal",
"postural_control": "Good"
},
"compensatory_strategies": [],
"reflex_influence": "None observed",
"safety_note": "",
"clinical_notes": []
}
left_knee_angles = []
right_knee_angles = []
left_hip_angles = []
right_hip_angles = []
left_ankle_heights = []
right_ankle_heights = []
weight_distributions = []
hip_drops = []
trunk_rotations = []
arm_heights = []
nose_positions = []
# First Pass: Calculate angles, weights, and collect gait phase data
for frame_data in frames_data:
kps = frame_data["keypoints"]
if not kps or len(kps) < 17:
analysis_results["frames"].append(frame_data)
continue
# Angles
l_knee = self.calculate_angle(kps[11], kps[13], kps[15])
r_knee = self.calculate_angle(kps[12], kps[14], kps[16])
l_hip = self.calculate_angle(kps[5], kps[11], kps[13])
r_hip = self.calculate_angle(kps[6], kps[12], kps[14])
frame_data["angles"] = {
"left_knee": l_knee,
"right_knee": r_knee,
"left_hip": l_hip,
"right_hip": r_hip
}
if l_knee: left_knee_angles.append(l_knee)
if r_knee: right_knee_angles.append(r_knee)
if l_hip: left_hip_angles.append(l_hip)
if r_hip: right_hip_angles.append(r_hip)
# Weight Distribution Analysis
weight_ratio, primary_side, imbalance = self.calculate_weight_distribution(
kps[15], kps[16], kps[11], kps[12]
)
if weight_ratio is not None:
weight_distributions.append({
"ratio": weight_ratio,
"primary_side": primary_side,
"imbalance_percent": imbalance
})
frame_data["weight_distribution"] = {
"ratio": weight_ratio,
"primary_side": primary_side,
"imbalance_percent": imbalance
}
# Hip Drop Detection
hip_drop, weakness = self.detect_hip_drop(kps[11], kps[12])
if hip_drop:
hip_drops.append({"type": hip_drop, "weakness": weakness})
# Trunk rotation
if kps[5] and kps[6]:
trunk_rotations.append(abs(kps[5][0] - kps[6][0]))
# Arm height for guarding
if kps[5] and kps[9]:
arm_heights.append(kps[5][1] - kps[9][1])
# Ankle heights for gait phase detection
if kps[15]:
left_ankle_heights.append(kps[15][1])
else:
left_ankle_heights.append(None)
if kps[16]:
right_ankle_heights.append(kps[16][1])
else:
right_ankle_heights.append(None)
# Nose position for head stability
if kps[0]:
nose_positions.append(kps[0][1])
analysis_results["frames"].append(frame_data)
# Gait Phase Detection
def detect_phases(heights):
phases = []
if not heights: return phases
valid_heights = [h for h in heights if h is not None]
if not valid_heights: return [None] * len(heights)
ground_threshold = np.percentile(valid_heights, 80)
for h in heights:
if h is None: phases.append(None)
elif h >= ground_threshold: phases.append("stance")
else: phases.append("swing")
return phases
l_phases = detect_phases(left_ankle_heights)
r_phases = detect_phases(right_ankle_heights)
stance_time_l = l_phases.count("stance")
swing_time_l = l_phases.count("swing")
stance_time_r = r_phases.count("stance")
swing_time_r = r_phases.count("swing")
double_support_frames = 0
for lp, rp in zip(l_phases, r_phases):
if lp == "stance" and rp == "stance":
double_support_frames += 1
phase_asymmetry = 0
if max(stance_time_l, stance_time_r) > 0:
phase_asymmetry = (abs(stance_time_l - stance_time_r) / max(stance_time_l, stance_time_r)) * 100
analysis_results["gait_phases"] = {
"stance_time_l": stance_time_l,
"stance_time_r": stance_time_r,
"swing_time_l": swing_time_l,
"swing_time_r": swing_time_r,
"double_support_frames": double_support_frames,
"phase_asymmetry": phase_asymmetry
}
# Postural Strength Assessment
strength_assessment = self.assess_postural_strength(frames_data)
# Analyze Weight Distribution
overall_imbalance = 0
primary_side = "balanced"
if weight_distributions:
imbalances = [w["imbalance_percent"] for w in weight_distributions]
overall_imbalance = np.mean(imbalances)
primary_sides = [w["primary_side"] for w in weight_distributions]
primary_side = max(set([s for s in primary_sides if s != "balanced"]),
key=primary_sides.count, default="balanced")
analysis_results["weight_distribution"]["overall_imbalance_percent"] = overall_imbalance
analysis_results["weight_distribution"]["primary_weight_side"] = primary_side
if overall_imbalance > self.WEIGHT_SHIFT_THRESHOLD * 100:
analysis_results["weight_distribution"]["weight_shift_issues"].append(
f"Significant weight shift ({overall_imbalance:.1f}% imbalance) - "
f"predominantly loading {primary_side} side"
)
analysis_results["observations"]["weight_distribution"] = f"Asymmetrical - favoring {primary_side}"
analysis_results["clinical_notes"].append(
f"WEIGHT DISTRIBUTION: Child shows {overall_imbalance:.1f}% weight shift toward {primary_side} side. "
f"Suggests pain avoidance, weakness, or motor planning issue on opposite side."
)
if hip_drops:
for drop in hip_drops:
analysis_results["weight_distribution"]["hip_drop_analysis"].append(drop)
analysis_results["clinical_notes"].append(
f"HIP DROP: {drop['type']} indicates {drop['weakness']} - "
f"weakness in hip abductors or gluteus medius"
)
# Postural Strength Scoring
trunk_sway = strength_assessment["trunk_sway"]
if trunk_sway > self.POSTURE_SWAY_THRESHOLD:
analysis_results["postural_strength"]["trunk_sway_severity"] = "excessive"
analysis_results["observations"]["postural_control"] = "Weak - excessive sway"
analysis_results["clinical_notes"].append(
f"TRUNK SWAY: Significant lateral sway (std: {trunk_sway:.1f}px). "
f"Suggests weak core stability."
)
shoulder_hike = strength_assessment["shoulder_hiking"]
if shoulder_hike > 5:
analysis_results["postural_strength"]["shoulder_stability"] = "hiking"
analysis_results["compensatory_strategies"].append("Shoulder hiking for stability")
analysis_results["clinical_notes"].append(
f"SHOULDER HIKING: Upper trapezius compensation for core/trunk weakness."
)
forward_lean = strength_assessment["forward_lean"]
if forward_lean > 15:
analysis_results["postural_strength"]["forward_lean"] = "excessive"
analysis_results["clinical_notes"].append(
f"FORWARD HEAD POSTURE: Head forward {forward_lean:.1f}px. "
f"Weak neck/shoulder stabilizers or balance compensation."
)
knee_issues = strength_assessment["knee_stability"]
if knee_issues.count("weak") > len(knee_issues) * 0.5:
analysis_results["postural_strength"]["knee_stability"] = "weak"
analysis_results["clinical_notes"].append(
f"KNEE WEAKNESS: Insufficient extension/flexion control. "
f"Quad weakness or motor planning issues."
)
weakness_indicators = sum([
trunk_sway > self.POSTURE_SWAY_THRESHOLD,
shoulder_hike > 5,
forward_lean > 15,
knee_issues.count("weak") > len(knee_issues) * 0.5
])
if weakness_indicators >= 2:
analysis_results["postural_strength"]["overall_strength_assessment"] = "inadequate"
analysis_results["observations"]["postural_control"] = "Floppy/weak - multiple issues"
analysis_results["clinical_notes"].append(
f"POSTURAL ASSESSMENT: Low muscle tone/weakness ({weakness_indicators} issues detected). "
f"Recommend strength assessment."
)
# Original Observations Logic
# Trunk
if trunk_rotations:
sway = np.std(trunk_rotations)
if sway > 15:
analysis_results["observations"]["trunk"] = "Stiff / reduced rotation"
analysis_results["compensatory_strategies"].append("Trunk used for balance")
elif sway > 10:
analysis_results["observations"]["trunk"] = "Rotated"
analysis_results["compensatory_strategies"].append("Trunk rotation")
# Head & Neck (vertical stability)
if nose_positions:
neck_stiffness = np.std(nose_positions)
if neck_stiffness < 2:
analysis_results["observations"]["head_neck"] = "Fixed / stiff"
analysis_results["compensatory_strategies"].append("Neck stiffness for stability")
# Arms
if arm_heights:
avg_arm_h = np.mean(arm_heights)
if avg_arm_h < -10:
analysis_results["observations"]["arms_hands"] = "Hands guarded"
analysis_results["compensatory_strategies"].append("Hands used for security")
# Reflex Influence Detection
signs = []
if analysis_results["observations"]["trunk"] == "Rotated":
signs.append("ATNR influence")
if "Hands guarded" in analysis_results["observations"]["arms_hands"]:
signs.append("Palmar influence")
if "Neck stiffness" in str(analysis_results["compensatory_strategies"]):
signs.append("Fear Paralysis influence")
if len(signs) >= 2:
analysis_results["reflex_influence"] = "Movement patterns suggest immature postural and protective responses consistent with incomplete primitive reflex integration."
analysis_results["observations"]["symmetry"] = "Mild asymmetry"
else:
analysis_results["reflex_influence"] = ""
# Numerical Safety Checks
num_steps = stance_time_l // 10
is_safe = self.independent_walking and num_steps >= 5 and len(frames_data) > 90
if not is_safe:
analysis_results["safety_note"] = "Numerical interpretation limited due to context."
# Stability: measures trunk steadiness only. Weight imbalance has its own score.
# Cap the trunk sway penalty at 50 points so the score stays meaningful.
trunk_sway_penalty = min(50, np.std(trunk_rotations) * 2 if trunk_rotations else 0)
stability_score = max(0, 100 - trunk_sway_penalty)
# Symmetry: based on gait phase asymmetry. Each 1% asymmetry = 1 point deducted.
sym_score = max(0, 100 - phase_asymmetry)
# Strength: each weakness indicator costs 25 points
strength_score = max(0, 100 - (weakness_indicators * 25))
# Weight shift: overall_imbalance is already 0-100% so maps directly
weight_shift_score = max(0, 100 - overall_imbalance) if weight_distributions else 100
analysis_results["summary"] = {
"stability_score": stability_score,
"symmetry_score": sym_score,
"weight_shift_score": weight_shift_score,
"strength_score": strength_score,
"phase_asymmetry_percent": phase_asymmetry,
"overall_gait_quality": "concerning" if weakness_indicators >= 2 else "acceptable",
"walking_condition": "Independent" if self.independent_walking else "Assisted"
}
return analysis_results
if __name__ == "__main__":
analyzer = EnhancedGaitAnalyzer(age=10)
print("Enhanced Gait Analyzer initialized with all features.")