OpenCvModel / opencvmodel.py
MUSKAN17's picture
Add Application files
d7207de verified
import cv2
import csv
import os
import mediapipe as mp
import json
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import math
import logging
# Suppress TensorFlow and MediaPipe logging
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '0'
logging.getLogger('mediapipe').setLevel(logging.ERROR)
class PoseEstimationModel:
def __init__(self):
self.mp_pose = mp.solutions.pose
self.pose_video = self.mp_pose.Pose(smooth_landmarks=True)
def detect_pose(self, image, pose, display=True):
output_image = image.copy()
imageRGB = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
results = pose.process(imageRGB)
height, width, _ = image.shape
landmarks = []
if results.pose_landmarks:
for landmark in results.pose_landmarks.landmark:
landmarks.append((int(landmark.x * width), int(landmark.y * height),
(landmark.z * width)))
if display:
plt.figure(figsize=[22,22])
plt.subplot(121);plt.imshow(image[:,:,::-1]);plt.title("Original Image");plt.axis('off');
plt.subplot(122);plt.imshow(output_image[:,:,::-1]);plt.title("Output Image");plt.axis('off');
plt.show()
else:
return output_image, landmarks
def calculate_angle(self, landmark1, landmark2, landmark3):
x1, y1, _ = landmark1
x2, y2, _ = landmark2
x3, y3, _ = landmark3
angle = math.degrees(math.atan2(y3 - y2, x3 - x2) - math.atan2(y1 - y2, x1 - x2))
if angle < 0:
angle += 360
if angle > 180:
angle = 360 - angle
return round(angle, 2)
def calculate_distance(self, point1, point2):
length = np.linalg.norm(np.array(point1) - np.array(point2))
return round(length, 2)
def body_angles(self, landmarks):
left_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value])
right_elbow_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value])
left_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value])
right_shoulder_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value])
left_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value])
right_knee_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value])
left_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_FOOT_INDEX.value])
right_ankle_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_FOOT_INDEX.value])
right_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value])
left_hip_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value])
right_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_KNEE.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value])
left_hip_to_hip = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_KNEE.value])
left_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.LEFT_ELBOW.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_WRIST.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_PINKY.value])
right_wrist_pinky_angle = self.calculate_angle(landmarks[self.mp_pose.PoseLandmark.RIGHT_ELBOW.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_WRIST.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_PINKY.value])
left_leg_length = self.calculate_distance(
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_ANKLE.value]
)
right_leg_length = self.calculate_distance(
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_ANKLE.value]
)
shoulder_width = self.calculate_distance(
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_SHOULDER.value]
)
hip_width = self.calculate_distance(
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value],
landmarks[self.mp_pose.PoseLandmark.RIGHT_HIP.value]
)
torso_height = self.calculate_distance(
landmarks[self.mp_pose.PoseLandmark.LEFT_SHOULDER.value],
landmarks[self.mp_pose.PoseLandmark.LEFT_HIP.value]
)
return {
"elbow_angles": [left_elbow_angle, right_elbow_angle],
"shoulder_angles": [left_shoulder_angle, right_shoulder_angle],
"knee_angles": [left_knee_angle, right_knee_angle],
"ankle_angles": [left_ankle_angle, right_ankle_angle],
"hip_angles": [left_hip_angle, right_hip_angle],
"wrist_pinky_angles": [left_wrist_pinky_angle, right_wrist_pinky_angle],
"hip_to_hip": [left_hip_to_hip, right_hip_to_hip],
"leg_lengths": [left_leg_length, right_leg_length],
"body_dimensions": {
"shoulder_width": shoulder_width,
"hip_width": hip_width,
"torso_height": torso_height
}
}
def process_video(self, video_path: str, log_file: str):
provided_video = cv2.VideoCapture(video_path)
if not provided_video.isOpened():
raise Exception("Cannot open video file.")
if os.path.exists(log_file):
os.remove(log_file)
while provided_video.isOpened():
ok, frame = provided_video.read()
if not ok:
break
frame, landmarks = self.detect_pose(frame, self.pose_video, display=False)
if landmarks:
body_points = self.body_angles(landmarks)
self.log_landmarks(body_points, log_file)
provided_video.release()
def log_landmarks(self, body_points: dict, log_file: str):
log_entry = {
"left_elbow_angles": body_points["elbow_angles"][0],
"right_elbow_angles": body_points["elbow_angles"][1],
"left_shoulder_angles": body_points["shoulder_angles"][0],
"right_shoulder_angles": body_points["shoulder_angles"][1],
"left_knee_angles": body_points["knee_angles"][0],
"right_knee_angles": body_points["knee_angles"][1],
"left_ankle_angles": body_points["ankle_angles"][0],
"right_ankle_angles": body_points["ankle_angles"][1],
"left_hip_angles": body_points["hip_angles"][0],
"right_hip_angles": body_points["hip_angles"][1],
"left_wrist_pinky_angle": body_points["wrist_pinky_angles"][0],
"right_wrist_pinky_angle": body_points["wrist_pinky_angles"][1],
"left_hip_to_hip": body_points["hip_to_hip"][0],
"right_hip_to_hip": body_points["hip_to_hip"][1]
}
with open(log_file, "a", newline='') as file:
writer = csv.writer(file)
if file.tell() == 0:
writer.writerow(log_entry.keys()) # Write header if empty
writer.writerow(log_entry.values())
def process_video_and_scale(self, video_path: str, csv_path: str, json_path: str):
self.process_video(video_path, csv_path)
self.std_scaler(csv_path, json_path)
return {"csv_path": csv_path, "json_path": json_path}
def predict(self, instances):
if not instances or "video_path" not in instances[0]:
raise ValueError("Invalid input format. Expected a dictionary with 'video_path', 'csv_path', and 'json_path' keys.")
video_path = instances[0].get("video_path")
csv_path = instances[0].get("csv_path")
json_path = instances[0].get("json_path")
print(f"Processing video: {video_path}, saving to {csv_path}, normalizing data in {json_path}")
result = self.process_video_and_scale(video_path, csv_path, json_path)
return {"predictions": result}
def save_model_config(self, filename):
config = {"smooth_landmarks": True}
with open(filename, "w") as file:
json.dump(config, file)
print(f"Model configuration saved to {filename}")
@staticmethod
def load_model_config(filename):
with open(filename, "r") as file:
config = json.load(file)
model = PoseEstimationModel()
print(f"Model configuration loaded from {filename}")
return model
if __name__ == "__main__":
model = PoseEstimationModel()
model.save_model_config('model_config.json')
loaded_model = PoseEstimationModel.load_model_config('model_config.json')
video_path = 'input_video.mp4'
log_file = 'output_log.csv'
loaded_model.process_video(video_path, log_file)
print("Video processed successfully! Pose data saved in:", log_file)