import cv2 import csv import os import mediapipe as mp import json import pandas as pd import matplotlib.pyplot as plt import numpy as np import math import logging # Suppress TensorFlow and MediaPipe logging os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0' logging.getLogger('mediapipe').setLevel(logging.ERROR) class PoseEstimationModel: def __init__(self): self.mp_pose = mp.solutions.pose self.pose_video = self.mp_pose.Pose(smooth_landmarks=True) def detect_pose(self, image, pose, display=True): output_image = image.copy() imageRGB = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) results = pose.process(imageRGB) height, width, _ = image.shape landmarks = [] if results.pose_landmarks: for landmark in results.pose_landmarks.landmark: landmarks.append((int(landmark.x * width), int(landmark.y * height), (landmark.z * width))) if display: plt.figure(figsize=[22,22]) plt.subplot(121);plt.imshow(image[:,:,::-1]);plt.title("Original Image");plt.axis('off'); plt.subplot(122);plt.imshow(output_image[:,:,::-1]);plt.title("Output Image");plt.axis('off'); plt.show() else: return output_image, landmarks def calculate_angle(self, landmark1, landmark2, landmark3): x1, y1, _ = landmark1 x2, y2, _ = landmark2 x3, y3, _ = landmark3 angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2)) if angle < 0: angle += 360 if angle > 180: angle = 360 - angle return round(angle, 2) def calculate_distance(self, point1, point2): length = np.linalg.norm(np.array(point1) - np.array(point2)) return round(length, 2) def body_angles(self, landmarks): left_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value], landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value]) right_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value]) left_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value], landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value]) right_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value]) left_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value], landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value]) right_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value]) left_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value], landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value], landmarks[self.mp_pose.PoseLandmark.LEFT_FOOT_INDEX.value]) right_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_FOOT_INDEX.value]) right_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value]) left_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value]) right_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value]) left_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value]) left_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value], landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value], landmarks[self.mp_pose.PoseLandmark.LEFT_PINKY.value]) right_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_PINKY.value]) left_leg_length = self.calculate_distance( landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value], landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value] ) right_leg_length = self.calculate_distance( landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value] ) shoulder_width = self.calculate_distance( landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value] ) hip_width = self.calculate_distance( landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value], landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value] ) torso_height = self.calculate_distance( landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value], landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value] ) return { "elbow_angles": [left_elbow_angle, right_elbow_angle], "shoulder_angles": [left_shoulder_angle, right_shoulder_angle], "knee_angles": [left_knee_angle, right_knee_angle], "ankle_angles": [left_ankle_angle, right_ankle_angle], "hip_angles": [left_hip_angle, right_hip_angle], "wrist_pinky_angles": [left_wrist_pinky_angle, right_wrist_pinky_angle], "hip_to_hip": [left_hip_to_hip, right_hip_to_hip], "leg_lengths": [left_leg_length, right_leg_length], "body_dimensions": { "shoulder_width": shoulder_width, "hip_width": hip_width, "torso_height": torso_height } } def process_video(self, video_path: str, log_file: str): provided_video = cv2.VideoCapture(video_path) if not provided_video.isOpened(): raise Exception("Cannot open video file.") if os.path.exists(log_file): os.remove(log_file) while provided_video.isOpened(): ok, frame = provided_video.read() if not ok: break frame, landmarks = self.detect_pose(frame, self.pose_video, display=False) if landmarks: body_points = self.body_angles(landmarks) self.log_landmarks(body_points, log_file) provided_video.release() def log_landmarks(self, body_points: dict, log_file: str): log_entry = { "left_elbow_angles": body_points["elbow_angles"][0], "right_elbow_angles": body_points["elbow_angles"][1], "left_shoulder_angles": body_points["shoulder_angles"][0], "right_shoulder_angles": body_points["shoulder_angles"][1], "left_knee_angles": body_points["knee_angles"][0], "right_knee_angles": body_points["knee_angles"][1], "left_ankle_angles": body_points["ankle_angles"][0], "right_ankle_angles": body_points["ankle_angles"][1], "left_hip_angles": body_points["hip_angles"][0], "right_hip_angles": body_points["hip_angles"][1], "left_wrist_pinky_angle": body_points["wrist_pinky_angles"][0], "right_wrist_pinky_angle": body_points["wrist_pinky_angles"][1], "left_hip_to_hip": body_points["hip_to_hip"][0], "right_hip_to_hip": body_points["hip_to_hip"][1] } with open(log_file, "a", newline='') as file: writer = csv.writer(file) if file.tell() == 0: writer.writerow(log_entry.keys()) # Write header if empty writer.writerow(log_entry.values()) def process_video_and_scale(self, video_path: str, csv_path: str, json_path: str): self.process_video(video_path, csv_path) self.std_scaler(csv_path, json_path) return {"csv_path": csv_path, "json_path": json_path} def predict(self, instances): if not instances or "video_path" not in instances[0]: raise ValueError("Invalid input format. Expected a dictionary with 'video_path', 'csv_path', and 'json_path' keys.") video_path = instances[0].get("video_path") csv_path = instances[0].get("csv_path") json_path = instances[0].get("json_path") print(f"Processing video: {video_path}, saving to {csv_path}, normalizing data in {json_path}") result = self.process_video_and_scale(video_path, csv_path, json_path) return {"predictions": result} def save_model_config(self, filename): config = {"smooth_landmarks": True} with open(filename, "w") as file: json.dump(config, file) print(f"Model configuration saved to {filename}") @staticmethod def load_model_config(filename): with open(filename, "r") as file: config = json.load(file) model = PoseEstimationModel() print(f"Model configuration loaded from {filename}") return model if __name__ == "__main__": model = PoseEstimationModel() model.save_model_config('model_config.json') loaded_model = PoseEstimationModel.load_model_config('model_config.json') video_path = 'input_video.mp4' log_file = 'output_log.csv' loaded_model.process_video(video_path, log_file) print("Video processed successfully! Pose data saved in:", log_file)