File size: 6,239 Bytes
a913145 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 |
import numpy as np
from ultralytics import YOLO
import cv2
import mediapipe as mp
import subprocess
import time
# Load YOLO Pose Detection model
model = YOLO("yolov8n-pose.pt", verbose=False) # Adjust for accuracy if needed
# Initialize MediaPipe Face Mesh
mp_face_mesh = mp.solutions.face_mesh
face_mesh = mp_face_mesh.FaceMesh(static_image_mode=False, max_num_faces=1, min_detection_confidence=0.5)
def detect_activity(keypoints, face_landmarks, previous_keypoints=None):
"""
Detect activity based on pose keypoints and facial landmarks.
:param keypoints: A numpy array of shape (16, 2) representing (x, y) keypoints.
:param face_landmarks: A list of facial landmarks for detecting lip movement.
:param previous_keypoints: A numpy array of shape (16, 2) representing keypoints from the previous frame.
:return: Detected activity as a string.
"""
def distance(pt1, pt2):
return np.linalg.norm(np.array(pt1) - np.array(pt2))
# Keypoint indices
NOSE, L_SHOULDER, R_SHOULDER, L_HIP, R_HIP, L_KNEE, R_KNEE, L_ANKLE, R_ANKLE, L_WRIST, R_WRIST = (
0, 5, 6, 11, 12, 13, 14, 15, 16, 9, 10
)
if keypoints is None or len(keypoints) == 0:
return "None"
# Extract important keypoints
nose = keypoints[NOSE]
left_shoulder = keypoints[L_SHOULDER]
right_shoulder = keypoints[R_SHOULDER]
left_hip = keypoints[L_HIP]
right_hip = keypoints[R_HIP]
left_knee = keypoints[L_KNEE]
right_knee = keypoints[R_KNEE]
left_ankle = keypoints[L_ANKLE]
right_ankle = keypoints[R_ANKLE]
left_wrist = keypoints[L_WRIST]
right_wrist = keypoints[R_WRIST]
# Calculate distances and movement metrics
torso_length = distance(nose, (left_hip + right_hip) / 2)
arm_movement = distance(left_wrist, left_shoulder) + distance(right_wrist, right_shoulder)
leg_movement = distance(left_knee, left_ankle) + distance(right_knee, right_ankle)
total_movement = arm_movement + leg_movement
# Detect mouth movement for talking
is_talking = False
if face_landmarks:
upper_lip = face_landmarks[13] # MediaPipe index for upper lip
lower_lip = face_landmarks[14] # MediaPipe index for lower lip
lip_distance = distance(upper_lip, lower_lip)
is_talking = lip_distance > 5 # Define a suitable threshold
# Detect activities
if is_talking:
return "Talking"
elif total_movement > torso_length * 1.2: # Detect dancing based on large synchronized movement
return "Dancing"
elif leg_movement > torso_length * 0.3:
return "Running"
elif arm_movement < torso_length * 0.2 and leg_movement < torso_length * 0.1:
return "Standing"
elif leg_movement > 0.1 and leg_movement < torso_length * 0.3:
return "Walking"
else:
return "Other Activity"
def process_gif(gif_path, confidence_score):
"""
Detect keypoints in a GIF and classify activities.
:param gif_path: Path to the input GIF.
"""
cap = cv2.VideoCapture(gif_path)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
fps = int(cap.get(cv2.CAP_PROP_FPS))
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
output_path = f"annotated_{gif_path}"
out = cv2.VideoWriter(output_path, fourcc, fps, (width, height))
previous_keypoints = None
while cap.isOpened():
ret, frame = cap.read()
if not ret:
break
# Perform pose detection
results = model.predict(source=frame, conf=0.5, save=False, verbose=False)
if results is None or len(results) == 0 or not hasattr(results[0], 'keypoints') or results[0].keypoints is None:
continue
for result in results:
for pose in result.keypoints.xy: # Loop through detected people
keypoints = np.array(pose)
# Detect facial landmarks using MediaPipe
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
face_results = face_mesh.process(rgb_frame)
face_landmarks = []
if face_results.multi_face_landmarks:
for face_landmark in face_results.multi_face_landmarks:
for landmark in face_landmark.landmark:
x = int(landmark.x * frame.shape[1])
y = int(landmark.y * frame.shape[0])
face_landmarks.append((x, y))
activity = detect_activity(keypoints, face_landmarks, previous_keypoints)
# Annotate the activity on the frame
cv2.putText(frame, activity, (50, 50), cv2.FONT_HERSHEY_SIMPLEX, 1, (255, 0, 0), 2, cv2.LINE_AA)
# Draw keypoints
for x, y in keypoints:
cv2.circle(frame, (int(x), int(y)), 5, (255, 0, 0), -1)
# Draw facial landmarks
for x, y in face_landmarks:
cv2.circle(frame, (x, y), 2, (0, 255, 0), -1)
previous_keypoints = keypoints # Update for temporal analysis
out.write(frame)
# cv2.imshow("Pose Activity Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'): # Press 'q' to quit
break
out.release()
repaired_path = f"repaired_{output_path}"
# Define the ffmpeg command
command = [
'ffmpeg', '-y',
'-i', output_path,
'-c:v', 'libx264',
'-c:a', 'aac',
repaired_path
]
if retry_file_access(output_path):
# Run the command
try:
subprocess.run(command, check=True)
print("Video processed successfully")
except subprocess.CalledProcessError as e:
print(f"Error occurred: {e}")
return repaired_path
def retry_file_access(file_path, retries=3, delay=2):
for i in range(retries):
try:
# Try accessing the file
with open(file_path, 'rb'):
return True
except IOError:
print(f"File is not ready yet. Retrying... {i+1}/{retries}")
time.sleep(delay)
print("File is not accessible after multiple retries.")
return False
|