computer-vision-problems / pose_analysis.py
manikandan18ramalingam's picture
Upload 4 files
a913145 verified
import numpy as np
from ultralytics import YOLO
import cv2
import mediapipe as mp
import subprocess
import time
# Load YOLO Pose Detection model
model = YOLO("yolov8n-pose.pt", verbose=False) # Adjust for accuracy if needed
# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5)
def detect_activity(keypoints, face_landmarks, previous_keypoints=None):
"""
Detect activity based on pose keypoints and facial landmarks.
:param keypoints: A numpy array of shape (16, 2) representing (x, y) keypoints.
:param face_landmarks: A list of facial landmarks for detecting lip movement.
:param previous_keypoints: A numpy array of shape (16, 2) representing keypoints from the previous frame.
:return: Detected activity as a string.
"""
def distance(pt1, pt2):
return np.linalg.norm(np.array(pt1) - np.array(pt2))
# Keypoint indices
NOSE, L_SHOULDER, R_SHOULDER, L_HIP, R_HIP, L_KNEE, R_KNEE, L_ANKLE, R_ANKLE, L_WRIST, R_WRIST = (
0, 5, 6, 11, 12, 13, 14, 15, 16, 9, 10
)
if keypoints is None or len(keypoints) == 0:
return "None"
# Extract important keypoints
nose = keypoints[NOSE]
left_shoulder = keypoints[L_SHOULDER]
right_shoulder = keypoints[R_SHOULDER]
left_hip = keypoints[L_HIP]
right_hip = keypoints[R_HIP]
left_knee = keypoints[L_KNEE]
right_knee = keypoints[R_KNEE]
left_ankle = keypoints[L_ANKLE]
right_ankle = keypoints[R_ANKLE]
left_wrist = keypoints[L_WRIST]
right_wrist = keypoints[R_WRIST]
# Calculate distances and movement metrics
torso_length = distance(nose, (left_hip + right_hip) / 2)
arm_movement = distance(left_wrist, left_shoulder) + distance(right_wrist, right_shoulder)
leg_movement = distance(left_knee, left_ankle) + distance(right_knee, right_ankle)
total_movement = arm_movement + leg_movement
# Detect mouth movement for talking
is_talking = False
if face_landmarks:
upper_lip = face_landmarks[13] # MediaPipe index for upper lip
lower_lip = face_landmarks[14] # MediaPipe index for lower lip
lip_distance = distance(upper_lip, lower_lip)
is_talking = lip_distance > 5 # Define a suitable threshold
# Detect activities
if is_talking:
return "Talking"
elif total_movement > torso_length * 1.2: # Detect dancing based on large synchronized movement
return "Dancing"
elif leg_movement > torso_length * 0.3:
return "Running"
elif arm_movement < torso_length * 0.2 and leg_movement < torso_length * 0.1:
return "Standing"
elif leg_movement > 0.1 and leg_movement < torso_length * 0.3:
return "Walking"
else:
return "Other Activity"
def process_gif(gif_path, confidence_score):
"""
Detect keypoints in a GIF and classify activities.
:param gif_path: Path to the input GIF.
"""
cap = cv2.VideoCapture(gif_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = f"annotated_{gif_path}"
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
previous_keypoints = None
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Perform pose detection
results = model.predict(source=frame, conf=0.5, save=False, verbose=False)
if results is None or len(results) == 0 or not hasattr(results[0], 'keypoints') or results[0].keypoints is None:
continue
for result in results:
for pose in result.keypoints.xy: # Loop through detected people
keypoints = np.array(pose)
# Detect facial landmarks using MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_results = face_mesh.process(rgb_frame)
face_landmarks = []
if face_results.multi_face_landmarks:
for face_landmark in face_results.multi_face_landmarks:
for landmark in face_landmark.landmark:
x = int(landmark.x * frame.shape[1])
y = int(landmark.y * frame.shape[0])
face_landmarks.append((x, y))
activity = detect_activity(keypoints, face_landmarks, previous_keypoints)
# Annotate the activity on the frame
cv2.putText(frame, activity, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
# Draw keypoints
for x, y in keypoints:
cv2.circle(frame, (int(x), int(y)), 5, (255, 0, 0), -1)
# Draw facial landmarks
for x, y in face_landmarks:
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
previous_keypoints = keypoints # Update for temporal analysis
out.write(frame)
# cv2.imshow("Pose Activity Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to quit
break
out.release()
repaired_path = f"repaired_{output_path}"
# Define the ffmpeg command
command = [
'ffmpeg', '-y',
'-i', output_path,
'-c:v', 'libx264',
'-c:a', 'aac',
repaired_path
]
if retry_file_access(output_path):
# Run the command
try:
subprocess.run(command, check=True)
print("Video processed successfully")
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return repaired_path
def retry_file_access(file_path, retries=3, delay=2):
for i in range(retries):
try:
# Try accessing the file
with open(file_path, 'rb'):
return True
except IOError:
print(f"File is not ready yet. Retrying... {i+1}/{retries}")
time.sleep(delay)
print("File is not accessible after multiple retries.")
return False