|
|
import cv2
|
|
|
import csv
|
|
|
import os
|
|
|
import mediapipe as mp
|
|
|
import json
|
|
|
import pandas as pd
|
|
|
import matplotlib.pyplot as plt
|
|
|
import numpy as np
|
|
|
import math
|
|
|
import logging
|
|
|
|
|
|
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'
|
|
|
logging.getLogger('mediapipe').setLevel(logging.ERROR)
|
|
|
|
|
|
class PoseEstimationModel:
|
|
|
def __init__(self):
|
|
|
self.mp_pose = mp.solutions.pose
|
|
|
self.pose_video = self.mp_pose.Pose(smooth_landmarks=True)
|
|
|
|
|
|
def detect_pose(self, image, pose, display=True):
|
|
|
output_image = image.copy()
|
|
|
imageRGB = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
|
|
|
results = pose.process(imageRGB)
|
|
|
height, width, _ = image.shape
|
|
|
landmarks = []
|
|
|
if results.pose_landmarks:
|
|
|
for landmark in results.pose_landmarks.landmark:
|
|
|
landmarks.append((int(landmark.x * width), int(landmark.y * height),
|
|
|
(landmark.z * width)))
|
|
|
if display:
|
|
|
plt.figure(figsize=[22,22])
|
|
|
plt.subplot(121);plt.imshow(image[:,:,::-1]);plt.title("Original Image");plt.axis('off');
|
|
|
plt.subplot(122);plt.imshow(output_image[:,:,::-1]);plt.title("Output Image");plt.axis('off');
|
|
|
plt.show()
|
|
|
else:
|
|
|
return output_image, landmarks
|
|
|
|
|
|
def calculate_angle(self, landmark1, landmark2, landmark3):
|
|
|
x1, y1, _ = landmark1
|
|
|
x2, y2, _ = landmark2
|
|
|
x3, y3, _ = landmark3
|
|
|
angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2))
|
|
|
if angle < 0:
|
|
|
angle += 360
|
|
|
if angle > 180:
|
|
|
angle = 360 - angle
|
|
|
|
|
|
return round(angle, 2)
|
|
|
|
|
|
def calculate_distance(self, point1, point2):
|
|
|
length = np.linalg.norm(np.array(point1) - np.array(point2))
|
|
|
return round(length, 2)
|
|
|
|
|
|
def body_angles(self, landmarks):
|
|
|
left_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value])
|
|
|
right_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value])
|
|
|
left_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value])
|
|
|
right_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value])
|
|
|
left_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value])
|
|
|
right_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value])
|
|
|
|
|
|
left_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_FOOT_INDEX.value])
|
|
|
|
|
|
right_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_FOOT_INDEX.value])
|
|
|
|
|
|
right_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value])
|
|
|
|
|
|
left_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value])
|
|
|
|
|
|
right_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value])
|
|
|
|
|
|
left_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value])
|
|
|
|
|
|
left_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_PINKY.value])
|
|
|
|
|
|
right_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_PINKY.value])
|
|
|
|
|
|
left_leg_length = self.calculate_distance(
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value]
|
|
|
)
|
|
|
right_leg_length = self.calculate_distance(
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value]
|
|
|
)
|
|
|
shoulder_width = self.calculate_distance(
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
|
|
|
)
|
|
|
hip_width = self.calculate_distance(
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value]
|
|
|
)
|
|
|
torso_height = self.calculate_distance(
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
|
|
|
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value]
|
|
|
)
|
|
|
|
|
|
return {
|
|
|
"elbow_angles": [left_elbow_angle, right_elbow_angle],
|
|
|
"shoulder_angles": [left_shoulder_angle, right_shoulder_angle],
|
|
|
"knee_angles": [left_knee_angle, right_knee_angle],
|
|
|
"ankle_angles": [left_ankle_angle, right_ankle_angle],
|
|
|
"hip_angles": [left_hip_angle, right_hip_angle],
|
|
|
"wrist_pinky_angles": [left_wrist_pinky_angle, right_wrist_pinky_angle],
|
|
|
"hip_to_hip": [left_hip_to_hip, right_hip_to_hip],
|
|
|
"leg_lengths": [left_leg_length, right_leg_length],
|
|
|
"body_dimensions": {
|
|
|
"shoulder_width": shoulder_width,
|
|
|
"hip_width": hip_width,
|
|
|
"torso_height": torso_height
|
|
|
}
|
|
|
}
|
|
|
|
|
|
def process_video(self, video_path: str, log_file: str):
|
|
|
provided_video = cv2.VideoCapture(video_path)
|
|
|
if not provided_video.isOpened():
|
|
|
raise Exception("Cannot open video file.")
|
|
|
|
|
|
if os.path.exists(log_file):
|
|
|
os.remove(log_file)
|
|
|
|
|
|
while provided_video.isOpened():
|
|
|
ok, frame = provided_video.read()
|
|
|
if not ok:
|
|
|
break
|
|
|
frame, landmarks = self.detect_pose(frame, self.pose_video, display=False)
|
|
|
if landmarks:
|
|
|
body_points = self.body_angles(landmarks)
|
|
|
self.log_landmarks(body_points, log_file)
|
|
|
|
|
|
provided_video.release()
|
|
|
|
|
|
def log_landmarks(self, body_points: dict, log_file: str):
|
|
|
log_entry = {
|
|
|
"left_elbow_angles": body_points["elbow_angles"][0],
|
|
|
"right_elbow_angles": body_points["elbow_angles"][1],
|
|
|
"left_shoulder_angles": body_points["shoulder_angles"][0],
|
|
|
"right_shoulder_angles": body_points["shoulder_angles"][1],
|
|
|
"left_knee_angles": body_points["knee_angles"][0],
|
|
|
"right_knee_angles": body_points["knee_angles"][1],
|
|
|
"left_ankle_angles": body_points["ankle_angles"][0],
|
|
|
"right_ankle_angles": body_points["ankle_angles"][1],
|
|
|
"left_hip_angles": body_points["hip_angles"][0],
|
|
|
"right_hip_angles": body_points["hip_angles"][1],
|
|
|
"left_wrist_pinky_angle": body_points["wrist_pinky_angles"][0],
|
|
|
"right_wrist_pinky_angle": body_points["wrist_pinky_angles"][1],
|
|
|
"left_hip_to_hip": body_points["hip_to_hip"][0],
|
|
|
"right_hip_to_hip": body_points["hip_to_hip"][1]
|
|
|
}
|
|
|
|
|
|
with open(log_file, "a", newline='') as file:
|
|
|
writer = csv.writer(file)
|
|
|
if file.tell() == 0:
|
|
|
writer.writerow(log_entry.keys())
|
|
|
writer.writerow(log_entry.values())
|
|
|
|
|
|
def process_video_and_scale(self, video_path: str, csv_path: str, json_path: str):
|
|
|
self.process_video(video_path, csv_path)
|
|
|
self.std_scaler(csv_path, json_path)
|
|
|
|
|
|
return {"csv_path": csv_path, "json_path": json_path}
|
|
|
|
|
|
def predict(self, instances):
|
|
|
if not instances or "video_path" not in instances[0]:
|
|
|
raise ValueError("Invalid input format. Expected a dictionary with 'video_path', 'csv_path', and 'json_path' keys.")
|
|
|
|
|
|
video_path = instances[0].get("video_path")
|
|
|
csv_path = instances[0].get("csv_path")
|
|
|
json_path = instances[0].get("json_path")
|
|
|
|
|
|
print(f"Processing video: {video_path}, saving to {csv_path}, normalizing data in {json_path}")
|
|
|
|
|
|
result = self.process_video_and_scale(video_path, csv_path, json_path)
|
|
|
return {"predictions": result}
|
|
|
|
|
|
def save_model_config(self, filename):
|
|
|
config = {"smooth_landmarks": True}
|
|
|
with open(filename, "w") as file:
|
|
|
json.dump(config, file)
|
|
|
print(f"Model configuration saved to {filename}")
|
|
|
|
|
|
@staticmethod
|
|
|
def load_model_config(filename):
|
|
|
with open(filename, "r") as file:
|
|
|
config = json.load(file)
|
|
|
model = PoseEstimationModel()
|
|
|
print(f"Model configuration loaded from {filename}")
|
|
|
return model
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
model = PoseEstimationModel()
|
|
|
model.save_model_config('model_config.json')
|
|
|
loaded_model = PoseEstimationModel.load_model_config('model_config.json')
|
|
|
video_path = 'input_video.mp4'
|
|
|
log_file = 'output_log.csv'
|
|
|
loaded_model.process_video(video_path, log_file)
|
|
|
print("Video processed successfully! Pose data saved in:", log_file) |